camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [2/5] camel git commit: CAMEL-10208: Make FluentProducerTemplate similar to ProducerTemplate to get from CamelContext and use
Date Mon, 01 Aug 2016 11:51:40 GMT
CAMEL-10208: Make FluentProducerTemplate similar to ProducerTemplate to get from CamelContext and use


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/687adda1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/687adda1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/687adda1

Branch: refs/heads/master
Commit: 687adda1501edadf55375c4be16ac1bccdb82021
Parents: cbe9069
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Mon Aug 1 13:35:25 2016 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Mon Aug 1 13:35:25 2016 +0200

----------------------------------------------------------------------
 .../java/org/apache/camel/CamelContext.java     |  34 ++
 .../apache/camel/FluentProducerTemplate.java    | 266 ++++++++++++++
 .../builder/DefaultFluentProducerTemplate.java  | 334 +++++++++++++++++
 .../camel/builder/FluentProducerTemplate.java   | 361 -------------------
 .../apache/camel/impl/DefaultCamelContext.java  |  19 +
 .../builder/FluentProducerTemplateTest.java     |  45 ++-
 .../jsonpath/JsonPathWithSimpleCBRTest.java     |  10 +-
 .../camel/test/junit4/CamelTestSupport.java     |  20 +-
 .../test/patterns/FilterFluentTemplateTest.java |  74 ++++
 9 files changed, 777 insertions(+), 386 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/687adda1/camel-core/src/main/java/org/apache/camel/CamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/CamelContext.java b/camel-core/src/main/java/org/apache/camel/CamelContext.java
index e5d13fd..c4d6943 100644
--- a/camel-core/src/main/java/org/apache/camel/CamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/CamelContext.java
@@ -1070,6 +1070,40 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration {
     ProducerTemplate createProducerTemplate(int maximumCacheSize);
 
     /**
+     * Creates a new {@link FluentProducerTemplate} which is <b>started</b> and therefore ready to use right away.
+     * <p/>
+     * See this FAQ before use: <a href="http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html">
+     * Why does Camel use too many threads with ProducerTemplate?</a>
+     * <p/>
+     * <b>Important:</b> Make sure to call {@link org.apache.camel.ProducerTemplate#stop()} when you are done using the template,
+     * to clean up any resources.
+     * <p/>
+     * Will use cache size defined in Camel property with key {@link Exchange#MAXIMUM_CACHE_POOL_SIZE}.
+     * If no key was defined then it will fallback to a default size of 1000.
+     * You can also use the {@link org.apache.camel.FluentProducerTemplate#setMaximumCacheSize(int)} method to use a custom value
+     * before starting the template.
+     *
+     * @return the template
+     * @throws RuntimeCamelException is thrown if error starting the template
+     */
+    FluentProducerTemplate createFluentProducerTemplate();
+
+    /**
+     * Creates a new {@link FluentProducerTemplate} which is <b>started</b> and therefore ready to use right away.
+     * <p/>
+     * See this FAQ before use: <a href="http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html">
+     * Why does Camel use too many threads with ProducerTemplate?</a>
+     * <p/>
+     * <b>Important:</b> Make sure to call {@link FluentProducerTemplate#stop()} when you are done using the template,
+     * to clean up any resources.
+     *
+     * @param maximumCacheSize the maximum cache size
+     * @return the template
+     * @throws RuntimeCamelException is thrown if error starting the template
+     */
+    FluentProducerTemplate createFluentProducerTemplate(int maximumCacheSize);
+
+    /**
      * Creates a new {@link ConsumerTemplate} which is <b>started</b> and therefore ready to use right away.
      * <p/>
      * See this FAQ before use: <a href="http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html">

http://git-wip-us.apache.org/repos/asf/camel/blob/687adda1/camel-core/src/main/java/org/apache/camel/FluentProducerTemplate.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/FluentProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/FluentProducerTemplate.java
new file mode 100644
index 0000000..c50fbe7
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/FluentProducerTemplate.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+import java.util.concurrent.Future;
+import java.util.function.*;
+
+public interface FluentProducerTemplate extends Service {
+
+    /**
+     * Get the {@link CamelContext}
+     *
+     * @return camelContext the Camel context
+     */
+    CamelContext getCamelContext();
+
+    // Configuration methods
+    // -----------------------------------------------------------------------
+
+    /**
+     * Gets the maximum cache size used in the backing cache pools.
+     *
+     * @return the maximum cache size
+     */
+    int getMaximumCacheSize();
+
+    /**
+     * Sets a custom maximum cache size to use in the backing cache pools.
+     *
+     * @param maximumCacheSize the custom maximum cache size
+     */
+    void setMaximumCacheSize(int maximumCacheSize);
+
+    /**
+     * Gets an approximated size of the current cached resources in the backing cache pools.
+     *
+     * @return the size of current cached resources
+     */
+    int getCurrentCacheSize();
+
+    /**
+     * Get the default endpoint to use if none is specified
+     *
+     * @return the default endpoint instance
+     */
+    Endpoint getDefaultEndpoint();
+
+    /**
+     * Sets the default endpoint to use if none is specified
+     *
+     * @param defaultEndpoint the default endpoint instance
+     */
+    void setDefaultEndpoint(Endpoint defaultEndpoint);
+
+    /**
+     * Sets the default endpoint uri to use if none is specified
+     *
+     *  @param endpointUri the default endpoint uri
+     */
+    void setDefaultEndpointUri(String endpointUri);
+
+    /**
+     * Sets whether the {@link org.apache.camel.spi.EventNotifier} should be
+     * used by this {@link ProducerTemplate} to send events about the {@link Exchange}
+     * being sent.
+     * <p/>
+     * By default this is enabled.
+     *
+     * @param enabled <tt>true</tt> to enable, <tt>false</tt> to disable.
+     */
+    void setEventNotifierEnabled(boolean enabled);
+
+    /**
+     * Whether the {@link org.apache.camel.spi.EventNotifier} should be
+     * used by this {@link ProducerTemplate} to send events about the {@link Exchange}
+     * being sent.
+     *
+     * @return <tt>true</tt> if enabled, <tt>false</tt> otherwise
+     */
+    boolean isEventNotifierEnabled();
+
+    /**
+     * Cleanup the cache (purging stale entries)
+     */
+    void cleanUp();
+
+    // Fluent methods
+    // -----------------------------------------------------------------------
+
+    /**
+     * Set the header
+     *
+     * @param key the key of the header
+     * @param value the value of the header
+     */
+    FluentProducerTemplate withHeader(String key, Object value);
+
+    /**
+     * Remove the headers.
+     */
+    FluentProducerTemplate clearHeaders();
+
+    /**
+     * Set the message body
+     *
+     * @param body the body
+     */
+    FluentProducerTemplate withBody(Object body);
+
+    /**
+     * Set the message body after converting it to the given type
+     *
+     * @param body the body
+     * @param type the type which the body should be converted to
+     */
+    FluentProducerTemplate withBodyAs(Object body, Class<?> type);
+
+    /**
+     * Remove the body.
+     */
+    FluentProducerTemplate clearBody();
+
+    /**
+     * To customize the producer template for advanced usage like to set the
+     * executor service to use.
+     *
+     * <pre>
+     * {@code
+     * FluentProducerTemplate.on(context)
+     *     .withTemplateCustomizer(
+     *         template -> {
+     *             template.setExecutorService(myExecutor);
+     *             template.setMaximumCacheSize(10);
+     *         }
+     *      )
+     *     .withBody("the body")
+     *     .to("direct:start")
+     *     .request()
+     * </pre>
+     *
+     * Note that it is invoked only once.
+     *
+     * @param templateCustomizer the customizer
+     */
+    FluentProducerTemplate withTemplateCustomizer(java.util.function.Consumer<ProducerTemplate> templateCustomizer);
+
+    /**
+     * Set the exchange to use for send.
+     *
+     * @param exchange the exchange
+     */
+    FluentProducerTemplate withExchange(Exchange exchange);
+
+    /**
+     * Set the exchangeSupplier which will be invoke to get the exchange to be
+     * used for send.
+     *
+     * @param exchangeSupplier the supplier
+     */
+    FluentProducerTemplate withExchange(Supplier<Exchange> exchangeSupplier);
+
+    /**
+     * Set the processor to use for send/request.
+     *
+     * <pre>
+     * {@code
+     * FluentProducerTemplate.on(context)
+     *     .withProcessor(
+     *         exchange -> {
+     *             exchange.getIn().setHeader("Key1", "Val1")
+     *             exchange.getIn().setHeader("Key2", "Val2")
+     *             exchange.getIn().setBody("the body")
+     *         }
+     *      )
+     *     .to("direct:start")
+     *     .request()
+     * </pre>
+     *
+     * @param processor
+     * @return
+     */
+    FluentProducerTemplate withProcessor(Processor processor);
+
+    /**
+     * Set the processorSupplier which will be invoke to get the processor to be
+     * used for send/request.
+     *
+     * @param processorSupplier the supplier
+     */
+    FluentProducerTemplate withProcessor(Supplier<Processor> processorSupplier);
+
+    /**
+     * Set the message body
+     *
+     * @param endpointUri the endpoint URI to send to
+     */
+    FluentProducerTemplate to(String endpointUri);
+
+    /**
+     * Set the message body
+     *
+     * @param endpoint the endpoint to send to
+     */
+    FluentProducerTemplate to(Endpoint endpoint);
+
+    /**
+     * Send to an endpoint returning any result output body.
+     *
+     * @return the result
+     * @throws CamelExecutionException is thrown if error occurred
+     */
+    Object request() throws CamelExecutionException;
+
+    /**
+     * Send to an endpoint.
+     *
+     * @param type the expected response type
+     * @return the result
+     * @throws CamelExecutionException is thrown if error occurred
+     */
+    @SuppressWarnings("unchecked")
+    <T> T request(Class<T> type) throws CamelExecutionException;
+
+    /**
+     * Sends asynchronously to the given endpoint.
+     *
+     * @return a handle to be used to get the response in the future
+     */
+    Future<Object> asyncRequest();
+
+    /**
+     * Sends asynchronously to the given endpoint.
+     *
+     * @param type the expected response type
+     * @return a handle to be used to get the response in the future
+     */
+    <T> Future<T> asyncRequest(Class<T> type);
+
+    /**
+     * Send to an endpoint
+     *
+     * @throws CamelExecutionException is thrown if error occurred
+     */
+    Exchange send() throws CamelExecutionException;
+
+    /**
+     * Sends asynchronously to the given endpoint.
+     *
+     * @return a handle to be used to get the response in the future
+     */
+    Future<Exchange> asyncSend();
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/687adda1/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java
new file mode 100644
index 0000000..283e991
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.FluentProducerTemplate;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.processor.ConvertBodyProcessor;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+
+public class DefaultFluentProducerTemplate extends ServiceSupport implements FluentProducerTemplate {
+    private final CamelContext context;
+    private final ClassValue<ConvertBodyProcessor> resultProcessors;
+    private Map<String, Object> headers;
+    private Object body;
+    private Endpoint endpoint;
+    private Consumer<ProducerTemplate> templateCustomizer;
+    private Supplier<Exchange> exchangeSupplier;
+    private Supplier<Processor> processorSupplier;
+    private volatile ProducerTemplate template;
+    private Endpoint defaultEndpoint;
+    private int maximumCacheSize;
+    private boolean eventNotifierEnabled = true;
+
+    public DefaultFluentProducerTemplate(CamelContext context) {
+        this.context = context;
+        this.headers = null;
+        this.body = null;
+        this.endpoint = null;
+        this.templateCustomizer = null;
+        this.exchangeSupplier = null;
+        this.processorSupplier = () -> this::populateExchange;
+        this.template = null;
+        this.resultProcessors = new ClassValue<ConvertBodyProcessor>() {
+            @Override
+            protected ConvertBodyProcessor computeValue(Class<?> type) {
+                return new ConvertBodyProcessor(type);
+            }
+        };
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return context;
+    }
+
+    @Override
+    public int getCurrentCacheSize() {
+        if (template == null) {
+            return 0;
+        }
+        return template.getCurrentCacheSize();
+    }
+
+    @Override
+    public void cleanUp() {
+        if (template != null) {
+            template.cleanUp();
+        }
+    }
+
+    @Override
+    public void setDefaultEndpointUri(String endpointUri) {
+        setDefaultEndpoint(getCamelContext().getEndpoint(endpointUri));
+    }
+
+    @Override
+    public Endpoint getDefaultEndpoint() {
+        return defaultEndpoint;
+    }
+
+    @Override
+    public void setDefaultEndpoint(Endpoint defaultEndpoint) {
+        this.defaultEndpoint = defaultEndpoint;
+    }
+
+    @Override
+    public int getMaximumCacheSize() {
+        return maximumCacheSize;
+    }
+
+    @Override
+    public void setMaximumCacheSize(int maximumCacheSize) {
+        this.maximumCacheSize = maximumCacheSize;
+    }
+
+    @Override
+    public boolean isEventNotifierEnabled() {
+        return eventNotifierEnabled;
+    }
+
+    @Override
+    public void setEventNotifierEnabled(boolean eventNotifierEnabled) {
+        this.eventNotifierEnabled = eventNotifierEnabled;
+    }
+
+    @Override
+    public FluentProducerTemplate withHeader(String key, Object value) {
+        if (headers == null) {
+            headers = new HashMap<>();
+        }
+
+        headers.put(key, value);
+
+        return this;
+    }
+
+    @Override
+    public FluentProducerTemplate clearHeaders() {
+        if (headers != null) {
+            headers.clear();
+        }
+
+        return this;
+    }
+
+    @Override
+    public FluentProducerTemplate withBody(Object body) {
+        this.body = body;
+
+        return this;
+    }
+
+    @Override
+    public FluentProducerTemplate withBodyAs(Object body, Class<?> type) {
+        this.body = type != null
+            ? context.getTypeConverter().convertTo(type, body)
+            : body;
+
+        return this;
+    }
+
+    @Override
+    public FluentProducerTemplate clearBody() {
+        this.body = null;
+
+        return this;
+    }
+
+    @Override
+    public FluentProducerTemplate withTemplateCustomizer(final Consumer<ProducerTemplate> templateCustomizer) {
+        this.templateCustomizer = templateCustomizer;
+        return this;
+    }
+
+    @Override
+    public FluentProducerTemplate withExchange(final Exchange exchange) {
+        return withExchange(() -> exchange);
+    }
+
+    @Override
+    public FluentProducerTemplate withExchange(final Supplier<Exchange> exchangeSupplier) {
+        this.exchangeSupplier = exchangeSupplier;
+        return this;
+    }
+
+    @Override
+    public FluentProducerTemplate withProcessor(final Processor processor) {
+        return withProcessor(() -> processor);
+    }
+
+    @Override
+    public FluentProducerTemplate withProcessor(final Supplier<Processor> processorSupplier) {
+        this.processorSupplier = processorSupplier;
+        return this;
+    }
+
+    @Override
+    public FluentProducerTemplate to(String endpointUri) {
+        return to(context.getEndpoint(endpointUri));
+    }
+
+    @Override
+    public FluentProducerTemplate to(Endpoint endpoint) {
+        this.endpoint = endpoint;
+        return this;
+    }
+
+    // ************************
+    // REQUEST
+    // ************************
+
+    @Override
+    public Object request() throws CamelExecutionException {
+        return request(Object.class);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T request(Class<T> type) throws CamelExecutionException {
+        T result;
+        Endpoint target = endpoint != null ? endpoint : defaultEndpoint;
+
+        if (type == Exchange.class) {
+            result = (T)template().request(target, processorSupplier.get());
+        } else if (type == Message.class) {
+            Exchange exchange = template().request(target, processorSupplier.get());
+            result = exchange.hasOut() ? (T)exchange.getOut() : (T)exchange.getIn();
+        } else {
+            Exchange exchange = template().send(
+                target,
+                ExchangePattern.InOut,
+                processorSupplier.get(),
+                resultProcessors.get(type)
+            );
+
+            result = context.getTypeConverter().convertTo(
+                type,
+                ExchangeHelper.extractResultBody(exchange, exchange.getPattern())
+            );
+        }
+
+        return result;
+    }
+
+    @Override
+    public Future<Object> asyncRequest() {
+        return asyncRequest(Object.class);
+    }
+
+    @Override
+    public <T> Future<T> asyncRequest(Class<T> type) {
+        Endpoint target = endpoint != null ? endpoint : defaultEndpoint;
+        Future<T> result;
+        if (headers != null) {
+            result = template().asyncRequestBodyAndHeaders(target, body, headers, type);
+        } else {
+            result = template().asyncRequestBody(target, body, type);
+        }
+
+        return result;
+    }
+
+    // ************************
+    // SEND
+    // ************************
+
+    @Override
+    public Exchange send() throws CamelExecutionException {
+        Endpoint target = endpoint != null ? endpoint : defaultEndpoint;
+        return exchangeSupplier != null
+            ? template().send(target, exchangeSupplier.get())
+            : template().send(target, processorSupplier.get());
+    }
+
+    @Override
+    public Future<Exchange> asyncSend() {
+        Endpoint target = endpoint != null ? endpoint : defaultEndpoint;
+        return exchangeSupplier != null
+            ? template().asyncSend(target, exchangeSupplier.get())
+            : template().asyncSend(target, processorSupplier.get());
+    }
+
+    // ************************
+    // HELPERS
+    // ************************
+
+    /**
+     * Create the FluentProducerTemplate by setting the camel context
+     *
+     * @param context the camel context
+     */
+    public static FluentProducerTemplate on(CamelContext context) {
+        return new DefaultFluentProducerTemplate(context);
+    }
+
+    private ProducerTemplate template() {
+        ObjectHelper.notNull(context, "CamelContext");
+
+        if (template == null) {
+            template = maximumCacheSize > 0 ? context.createProducerTemplate(maximumCacheSize) : context.createProducerTemplate();
+            if (defaultEndpoint != null) {
+                template.setDefaultEndpoint(defaultEndpoint);
+            }
+            template.setEventNotifierEnabled(eventNotifierEnabled);
+            if (templateCustomizer != null) {
+                templateCustomizer.accept(template);
+            }
+        }
+
+        return template;
+    }
+
+    private void populateExchange(Exchange exchange) throws Exception {
+        if (headers != null && !headers.isEmpty()) {
+            exchange.getIn().getHeaders().putAll(headers);
+        }
+        if (body != null) {
+            exchange.getIn().setBody(body);
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (template == null) {
+            template = template();
+        }
+        ServiceHelper.startService(template);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopService(template);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/687adda1/camel-core/src/main/java/org/apache/camel/builder/FluentProducerTemplate.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/builder/FluentProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/builder/FluentProducerTemplate.java
deleted file mode 100644
index e582d9f..0000000
--- a/camel-core/src/main/java/org/apache/camel/builder/FluentProducerTemplate.java
+++ /dev/null
@@ -1,361 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.builder;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Future;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.CamelExecutionException;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Message;
-import org.apache.camel.Processor;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.processor.ConvertBodyProcessor;
-import org.apache.camel.util.ExchangeHelper;
-import org.apache.camel.util.ObjectHelper;
-
-public class FluentProducerTemplate {
-    private final CamelContext context;
-    private final ClassValue<ConvertBodyProcessor> resultProcessors;
-    private Map<String, Object> headers;
-    private Object body;
-    private Endpoint endpoint;
-    private Consumer<ProducerTemplate> templateCustomizer;
-    private Supplier<Exchange> exchangeSupplier;
-    private Supplier<Processor> processorSupplier;
-    private ProducerTemplate template;
-
-    public FluentProducerTemplate(CamelContext context) {
-        this.context = context;
-        this.headers = null;
-        this.body = null;
-        this.endpoint = null;
-        this.templateCustomizer = null;
-        this.exchangeSupplier = null;
-        this.processorSupplier = () -> this::populateExchange;
-        this.template = null;
-        this.resultProcessors = new ClassValue<ConvertBodyProcessor>() {
-            @Override
-            protected ConvertBodyProcessor computeValue(Class<?> type) {
-                return new ConvertBodyProcessor(type);
-            }
-        };
-    }
-
-    /**
-     * Set the header
-     *
-     * @param key the key of the header
-     * @param value the value of the header
-     */
-    public FluentProducerTemplate withHeader(String key, Object value) {
-        if (headers == null) {
-            headers = new HashMap<>();
-        }
-
-        headers.put(key, value);
-
-        return this;
-    }
-
-    /**
-     * Remove the headers.
-     */
-    public FluentProducerTemplate clearHeaders() {
-        if (headers != null) {
-            headers.clear();
-        }
-
-        return this;
-    }
-
-    /**
-     * Set the message body
-     *
-     * @param body the body
-     */
-    public FluentProducerTemplate withBody(Object body) {
-        this.body = body;
-
-        return this;
-    }
-
-    /**
-     * Set the message body after converting it to the given type
-     *
-     * @param body the body
-     * @param type the type which the body should be converted to
-     */
-    public FluentProducerTemplate withBodyAs(Object body, Class<?> type) {
-        this.body = type != null
-            ? context.getTypeConverter().convertTo(type, body)
-            : body;
-
-        return this;
-    }
-
-    /**
-     * Remove the body.
-     */
-    public FluentProducerTemplate clearBody() {
-        this.body = null;
-
-        return this;
-    }
-
-    /**
-     * To customize the producer template for advanced usage like to set the
-     * executor service to use.
-     *
-     * <pre>
-     * {@code
-     * FluentProducerTemplate.on(context)
-     *     .withTemplateCustomizer(
-     *         template -> {
-     *             template.setExecutorService(myExecutor);
-     *             template.setMaximumCacheSize(10);
-     *         }
-     *      )
-     *     .withBody("the body")
-     *     .to("direct:start")
-     *     .request()
-     * </pre>
-     *
-     * Note that it is invoked only once.
-     *
-     * @param templateCustomizer the customizer
-     */
-    public FluentProducerTemplate withTemplateCustomizer(final Consumer<ProducerTemplate> templateCustomizer) {
-        this.templateCustomizer = templateCustomizer;
-        return this;
-    }
-
-    /**
-     * Set the exchange to use for send.
-     *
-     * @param exchange the exchange
-     */
-    public FluentProducerTemplate withExchange(final Exchange exchange) {
-        return withExchange(() -> exchange);
-    }
-
-    /**
-     * Set the exchangeSupplier which will be invoke to get the exchange to be
-     * used for send.
-     *
-     * @param exchangeSupplier the supplier
-     */
-    public FluentProducerTemplate withExchange(final Supplier<Exchange> exchangeSupplier) {
-        this.exchangeSupplier = exchangeSupplier;
-        return this;
-    }
-
-    /**
-     * Set the processor to use for send/request.
-     *
-     * <pre>
-     * {@code
-     * FluentProducerTemplate.on(context)
-     *     .withProcessor(
-     *         exchange -> {
-     *             exchange.getIn().setHeader("Key1", "Val1")
-     *             exchange.getIn().setHeader("Key2", "Val2")
-     *             exchange.getIn().setBody("the body")
-     *         }
-     *      )
-     *     .to("direct:start")
-     *     .request()
-     * </pre>
-     *
-     * @param processor
-     * @return
-     */
-    public FluentProducerTemplate withProcessor(final Processor processor) {
-        return withProcessor(() -> processor);
-    }
-
-    /**
-     * Set the processorSupplier which will be invoke to get the processor to be
-     * used for send/request.
-     *
-     * @param processorSupplier the supplier
-     */
-    public FluentProducerTemplate withProcessor(final Supplier<Processor> processorSupplier) {
-        this.processorSupplier = processorSupplier;
-        return this;
-    }
-
-    /**
-     * Set the message body
-     *
-     * @param endpointUri the endpoint URI to send to
-     */
-    public FluentProducerTemplate to(String endpointUri) {
-        return to(context.getEndpoint(endpointUri));
-    }
-
-    /**
-     * Set the message body
-     *
-     * @param endpoint the endpoint to send to
-     */
-    public FluentProducerTemplate to(Endpoint endpoint) {
-        this.endpoint = endpoint;
-        return this;
-    }
-
-    // ************************
-    // REQUEST
-    // ************************
-
-    /**
-     * Send to an endpoint returning any result output body.
-     *
-     * @return the result
-     * @throws CamelExecutionException is thrown if error occurred
-     */
-    public Object request() throws CamelExecutionException {
-        return request(Object.class);
-    }
-
-    /**
-     * Send to an endpoint.
-     *
-     * @param type the expected response type
-     * @return the result
-     * @throws CamelExecutionException is thrown if error occurred
-     */
-    @SuppressWarnings("unchecked")
-    public <T> T request(Class<T> type) throws CamelExecutionException {
-        T result;
-        if (type == Exchange.class) {
-            result = (T)template().request(endpoint, processorSupplier.get());
-        } else if (type == Message.class) {
-            Exchange exchange = template().request(endpoint, processorSupplier.get());
-            result = exchange.hasOut() ? (T)exchange.getOut() : (T)exchange.getIn();
-        } else {
-            Exchange exchange = template().send(
-                endpoint,
-                ExchangePattern.InOut,
-                processorSupplier.get(),
-                resultProcessors.get(type)
-            );
-
-            result = context.getTypeConverter().convertTo(
-                type,
-                ExchangeHelper.extractResultBody(exchange, exchange.getPattern())
-            );
-        }
-
-        return result;
-    }
-
-    /**
-     * Sends asynchronously to the given endpoint.
-     *
-     * @return a handle to be used to get the response in the future
-     */
-    public Future<Object> asyncRequest() {
-        return asyncRequest(Object.class);
-    }
-
-    /**
-     * Sends asynchronously to the given endpoint.
-     *
-     * @param type the expected response type
-     * @return a handle to be used to get the response in the future
-     */
-    public <T> Future<T> asyncRequest(Class<T> type) {
-        Future<T> result;
-        if (headers != null) {
-            result = template().asyncRequestBodyAndHeaders(endpoint, body, headers, type);
-        } else {
-            result = template().asyncRequestBody(endpoint, body, type);
-        }
-
-        return result;
-    }
-
-    // ************************
-    // SEND
-    // ************************
-
-    /**
-     * Send to an endpoint
-     *
-     * @throws CamelExecutionException is thrown if error occurred
-     */
-    public Exchange send() throws CamelExecutionException {
-        return exchangeSupplier != null
-            ? template().send(endpoint, exchangeSupplier.get())
-            : template().send(endpoint, processorSupplier.get());
-    }
-
-    /**
-     * Sends asynchronously to the given endpoint.
-     *
-     * @return a handle to be used to get the response in the future
-     */
-    public Future<Exchange> asyncSend() {
-        return exchangeSupplier != null
-            ? template().asyncSend(endpoint, exchangeSupplier.get())
-            : template().asyncSend(endpoint, processorSupplier.get());
-    }
-
-    // ************************
-    // HELPERS
-    // ************************
-
-    /**
-     * Create the FluentProducerTemplate by setting the camel context
-     *
-     * @param context the camel context
-     */
-    public static FluentProducerTemplate on(CamelContext context) {
-        return new FluentProducerTemplate(context);
-    }
-
-    private ProducerTemplate template() {
-        ObjectHelper.notNull(context, "camel-context");
-        ObjectHelper.notNull(endpoint, "endpoint");
-
-        if (this.template == null) {
-            template = context.createProducerTemplate();
-            if (templateCustomizer != null) {
-                templateCustomizer.accept(template);
-            }
-        }
-
-        return template;
-    }
-
-    private void populateExchange(Exchange exchange) throws Exception {
-        if (headers != null && !headers.isEmpty()) {
-            exchange.getIn().getHeaders().putAll(headers);
-        }
-        if (body != null) {
-            exchange.getIn().setBody(body);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/687adda1/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index c325e89..3d0f13b 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -56,6 +56,7 @@ import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.Endpoint;
 import org.apache.camel.ErrorHandlerFactory;
 import org.apache.camel.FailedToStartRouteException;
+import org.apache.camel.FluentProducerTemplate;
 import org.apache.camel.IsSingleton;
 import org.apache.camel.MultipleConsumersSupport;
 import org.apache.camel.NamedNode;
@@ -82,6 +83,7 @@ import org.apache.camel.VetoCamelContextStartException;
 import org.apache.camel.api.management.mbean.ManagedCamelContextMBean;
 import org.apache.camel.api.management.mbean.ManagedProcessorMBean;
 import org.apache.camel.api.management.mbean.ManagedRouteMBean;
+import org.apache.camel.builder.DefaultFluentProducerTemplate;
 import org.apache.camel.builder.ErrorHandlerBuilder;
 import org.apache.camel.builder.ErrorHandlerBuilderSupport;
 import org.apache.camel.component.properties.PropertiesComponent;
@@ -2705,6 +2707,23 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
         return answer;
     }
 
+    public FluentProducerTemplate createFluentProducerTemplate() {
+        int size = CamelContextHelper.getMaximumCachePoolSize(this);
+        return createFluentProducerTemplate(size);
+    }
+
+    public FluentProducerTemplate createFluentProducerTemplate(int maximumCacheSize) {
+        DefaultFluentProducerTemplate answer = new DefaultFluentProducerTemplate(this);
+        answer.setMaximumCacheSize(maximumCacheSize);
+        // start it so its ready to use
+        try {
+            startService(answer);
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+        return answer;
+    }
+
     public ConsumerTemplate createConsumerTemplate() {
         int size = CamelContextHelper.getMaximumCachePoolSize(this);
         return createConsumerTemplate(size);

http://git-wip-us.apache.org/repos/asf/camel/blob/687adda1/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java b/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
index 29d8dc0..412e08d 100644
--- a/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
+++ b/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
@@ -20,19 +20,38 @@ import org.apache.camel.CamelExecutionException;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.FluentProducerTemplate;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.mock.MockEndpoint;
 
 /**
- * Unit test for DefaultProducerTemplate
+ * Unit test for FluentProducerTemplate
  */
 public class FluentProducerTemplateTest extends ContextTestSupport {
 
+    public void testFromCamelContext() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Bye World");
+
+        FluentProducerTemplate fluent = context.createFluentProducerTemplate();
+
+        Object result = fluent
+            .withBody("Hello World")
+            .to("direct:in")
+            .request();
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals("Bye World", result);
+
+        assertSame(context, fluent.getCamelContext());
+    }
+
     public void testIn() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("Bye World");
 
-        Object result = FluentProducerTemplate.on(context)
+        Object result = DefaultFluentProducerTemplate.on(context)
             .withBody("Hello World")
             .to("direct:in")
             .request();
@@ -48,7 +67,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("Bye Bye World");
 
-        Object result = FluentProducerTemplate.on(context)
+        Object result = DefaultFluentProducerTemplate.on(context)
             .withBody("Hello World")
             .to("direct:out")
             .request();
@@ -62,7 +81,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived(11);
 
-        Object result = FluentProducerTemplate.on(context)
+        Object result = DefaultFluentProducerTemplate.on(context)
             .withBodyAs("10", Integer.class)
             .to("direct:sum")
             .request();
@@ -77,7 +96,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
         mock.expectedMessageCount(0);
 
         try {
-            FluentProducerTemplate.on(context)
+            DefaultFluentProducerTemplate.on(context)
                 .withBodyAs("10", Double.class)
                 .to("direct:sum")
                 .request();
@@ -93,7 +112,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(0);
 
-        Object result = FluentProducerTemplate.on(context)
+        Object result = DefaultFluentProducerTemplate.on(context)
             .withBody("Hello World")
             .to("direct:fault")
             .request();
@@ -107,7 +126,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(0);
 
-        Exchange out = FluentProducerTemplate.on(context)
+        Exchange out = DefaultFluentProducerTemplate.on(context)
             .withBody("Hello World")
             .to("direct:exception")
             .send();
@@ -123,7 +142,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(0);
 
-        Exchange out = FluentProducerTemplate.on(context)
+        Exchange out = DefaultFluentProducerTemplate.on(context)
             .withProcessor(exchange -> exchange.getIn().setBody("Hello World"))
             .to("direct:exception")
             .send();
@@ -138,7 +157,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(0);
 
-        Exchange out = FluentProducerTemplate.on(context)
+        Exchange out = DefaultFluentProducerTemplate.on(context)
                 .withExchange(() -> {
                     Exchange exchange = context.getEndpoint("direct:exception").createExchange();
                     exchange.getIn().setBody("Hello World");
@@ -158,7 +177,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
         mock.expectedMessageCount(0);
 
         try {
-            FluentProducerTemplate.on(context)
+            DefaultFluentProducerTemplate.on(context)
                 .withBody("Hello World")
                 .to("direct:exception")
                 .request();
@@ -176,7 +195,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(0);
 
-        Exchange out = FluentProducerTemplate.on(context)
+        Exchange out = DefaultFluentProducerTemplate.on(context)
             .withProcessor(exchange -> exchange.getIn().setBody("Hello World"))
             .to("direct:exception")
             .request(Exchange.class);
@@ -191,7 +210,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(0);
 
-        Exchange out = FluentProducerTemplate.on(context)
+        Exchange out = DefaultFluentProducerTemplate.on(context)
             .withExchange(() -> {
                 Exchange exchange = context.getEndpoint("direct:exception").createExchange(ExchangePattern.InOut);
                 exchange.getIn().setBody("Hello World");
@@ -208,7 +227,7 @@ public class FluentProducerTemplateTest extends ContextTestSupport {
 
     public void testRequestBody() throws Exception {
         // with endpoint as string uri
-        FluentProducerTemplate template = FluentProducerTemplate.on(context);
+        FluentProducerTemplate template = DefaultFluentProducerTemplate.on(context);
 
         final Integer expectedResult = new Integer(123);
 

http://git-wip-us.apache.org/repos/asf/camel/blob/687adda1/components/camel-jsonpath/src/test/java/org/apache/camel/jsonpath/JsonPathWithSimpleCBRTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jsonpath/src/test/java/org/apache/camel/jsonpath/JsonPathWithSimpleCBRTest.java b/components/camel-jsonpath/src/test/java/org/apache/camel/jsonpath/JsonPathWithSimpleCBRTest.java
index eb9bbf9..f1b1029 100644
--- a/components/camel-jsonpath/src/test/java/org/apache/camel/jsonpath/JsonPathWithSimpleCBRTest.java
+++ b/components/camel-jsonpath/src/test/java/org/apache/camel/jsonpath/JsonPathWithSimpleCBRTest.java
@@ -18,7 +18,6 @@ package org.apache.camel.jsonpath;
 
 import java.io.File;
 
-import org.apache.camel.builder.FluentProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
@@ -48,8 +47,7 @@ public class JsonPathWithSimpleCBRTest extends CamelTestSupport {
         getMockEndpoint("mock:average").expectedMessageCount(0);
         getMockEndpoint("mock:expensive").expectedMessageCount(0);
 
-        FluentProducerTemplate fluent = new FluentProducerTemplate(context);
-        fluent.withHeader("cheap", 10).withHeader("average", 30).withBody(new File("src/test/resources/cheap.json"))
+        fluentTemplate.withHeader("cheap", 10).withHeader("average", 30).withBody(new File("src/test/resources/cheap.json"))
                 .to("direct:start").send();
 
         assertMockEndpointsSatisfied();
@@ -61,8 +59,7 @@ public class JsonPathWithSimpleCBRTest extends CamelTestSupport {
         getMockEndpoint("mock:average").expectedMessageCount(1);
         getMockEndpoint("mock:expensive").expectedMessageCount(0);
 
-        FluentProducerTemplate fluent = new FluentProducerTemplate(context);
-        fluent.withHeader("cheap", 10).withHeader("average", 30).withBody(new File("src/test/resources/average.json"))
+        fluentTemplate.withHeader("cheap", 10).withHeader("average", 30).withBody(new File("src/test/resources/average.json"))
                 .to("direct:start").send();
 
         assertMockEndpointsSatisfied();
@@ -74,8 +71,7 @@ public class JsonPathWithSimpleCBRTest extends CamelTestSupport {
         getMockEndpoint("mock:average").expectedMessageCount(0);
         getMockEndpoint("mock:expensive").expectedMessageCount(1);
 
-        FluentProducerTemplate fluent = new FluentProducerTemplate(context);
-        fluent.withHeader("cheap", 10).withHeader("average", 30).withBody(new File("src/test/resources/expensive.json"))
+        fluentTemplate.withHeader("cheap", 10).withHeader("average", 30).withBody(new File("src/test/resources/expensive.json"))
                 .to("direct:start").send();
 
         assertMockEndpointsSatisfied();

http://git-wip-us.apache.org/repos/asf/camel/blob/687adda1/components/camel-test/src/main/java/org/apache/camel/test/junit4/CamelTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-test/src/main/java/org/apache/camel/test/junit4/CamelTestSupport.java b/components/camel-test/src/main/java/org/apache/camel/test/junit4/CamelTestSupport.java
index 03fd1fe..34a18ed 100644
--- a/components/camel-test/src/main/java/org/apache/camel/test/junit4/CamelTestSupport.java
+++ b/components/camel-test/src/main/java/org/apache/camel/test/junit4/CamelTestSupport.java
@@ -44,7 +44,7 @@ import org.apache.camel.Service;
 import org.apache.camel.ServiceStatus;
 import org.apache.camel.api.management.mbean.ManagedCamelContextMBean;
 import org.apache.camel.builder.AdviceWithRouteBuilder;
-import org.apache.camel.builder.FluentProducerTemplate;
+import org.apache.camel.FluentProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.properties.PropertiesComponent;
@@ -79,6 +79,7 @@ public abstract class CamelTestSupport extends TestSupport {
     private static final ThreadLocal<Boolean> INIT = new ThreadLocal<Boolean>();
     private static ThreadLocal<ModelCamelContext> threadCamelContext = new ThreadLocal<ModelCamelContext>();
     private static ThreadLocal<ProducerTemplate> threadTemplate = new ThreadLocal<ProducerTemplate>();
+    private static ThreadLocal<FluentProducerTemplate> threadFluentTemplate = new ThreadLocal<FluentProducerTemplate>();
     private static ThreadLocal<ConsumerTemplate> threadConsumer = new ThreadLocal<ConsumerTemplate>();
     private static ThreadLocal<Service> threadService = new ThreadLocal<Service>();
     protected volatile ModelCamelContext context;
@@ -296,11 +297,13 @@ public abstract class CamelTestSupport extends TestSupport {
 
         template = context.createProducerTemplate();
         template.start();
+        fluentTemplate = context.createFluentProducerTemplate();
+        fluentTemplate.start();
         consumer = context.createConsumerTemplate();
         consumer.start();
-        fluentTemplate = FluentProducerTemplate.on(context());
 
         threadTemplate.set(template);
+        threadFluentTemplate.set(fluentTemplate);
         threadConsumer.set(consumer);
 
         // enable auto mocking if enabled
@@ -404,7 +407,7 @@ public abstract class CamelTestSupport extends TestSupport {
         }
 
         LOG.debug("tearDown test");
-        doStopTemplates(consumer, template);
+        doStopTemplates(consumer, template, fluentTemplate);
         doStopCamelContext(context, camelContextService);
     }
 
@@ -412,7 +415,7 @@ public abstract class CamelTestSupport extends TestSupport {
     public static void tearDownAfterClass() throws Exception {
         INIT.remove();
         LOG.debug("tearDownAfterClass test");
-        doStopTemplates(threadConsumer.get(), threadTemplate.get());
+        doStopTemplates(threadConsumer.get(), threadTemplate.get(), threadFluentTemplate.get());
         doStopCamelContext(threadCamelContext.get(), threadService.get());
     }
 
@@ -489,6 +492,7 @@ public abstract class CamelTestSupport extends TestSupport {
     protected void postProcessTest() throws Exception {
         context = threadCamelContext.get();
         template = threadTemplate.get();
+        fluentTemplate = threadFluentTemplate.get();
         consumer = threadConsumer.get();
         camelContextService = threadService.get();
         applyCamelPostProcessor();
@@ -527,7 +531,7 @@ public abstract class CamelTestSupport extends TestSupport {
         }
     }
 
-    private static void doStopTemplates(ConsumerTemplate consumer, ProducerTemplate template) throws Exception {
+    private static void doStopTemplates(ConsumerTemplate consumer, ProducerTemplate template, FluentProducerTemplate fluentTemplate) throws Exception {
         if (consumer != null) {
             if (consumer == threadConsumer.get()) {
                 threadConsumer.remove();
@@ -540,6 +544,12 @@ public abstract class CamelTestSupport extends TestSupport {
             }
             template.stop();
         }
+        if (fluentTemplate != null) {
+            if (fluentTemplate == threadFluentTemplate.get()) {
+                threadFluentTemplate.remove();
+            }
+            fluentTemplate.stop();
+        }
     }
 
     protected void startCamelContext() throws Exception {

http://git-wip-us.apache.org/repos/asf/camel/blob/687adda1/components/camel-test/src/test/java/org/apache/camel/test/patterns/FilterFluentTemplateTest.java
----------------------------------------------------------------------
diff --git a/components/camel-test/src/test/java/org/apache/camel/test/patterns/FilterFluentTemplateTest.java b/components/camel-test/src/test/java/org/apache/camel/test/patterns/FilterFluentTemplateTest.java
new file mode 100644
index 0000000..26ee527
--- /dev/null
+++ b/components/camel-test/src/test/java/org/apache/camel/test/patterns/FilterFluentTemplateTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.test.patterns;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Tests filtering using Camel Test
+ * 
+ * @version 
+ */
+// START SNIPPET: example
+// tag::example[]
+public class FilterFluentTemplateTest extends CamelTestSupport {
+
+    @EndpointInject(uri = "mock:result")
+    protected MockEndpoint resultEndpoint;
+
+    @Override
+    public boolean isDumpRouteCoverage() {
+        return true;
+    }
+
+    @Test
+    public void testSendMatchingMessage() throws Exception {
+        String expectedBody = "<matched/>";
+
+        resultEndpoint.expectedBodiesReceived(expectedBody);
+
+        fluentTemplate.withBody(expectedBody).withHeader("foo", "bar").to("direct:start").send();
+
+        resultEndpoint.assertIsSatisfied();
+    }
+
+    @Test
+    public void testSendNotMatchingMessage() throws Exception {
+        resultEndpoint.expectedMessageCount(0);
+
+        fluentTemplate.withBody("<notMatched/>").withHeader("foo", "notMatchedHeaderValue").to("direct:start").send();;
+
+        resultEndpoint.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start").filter(header("foo").isEqualTo("bar")).to("mock:result");
+            }
+        };
+    }
+}
+// end::example[]
+// END SNIPPET: example


Mime
View raw message