camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lburgazz...@apache.org
Subject camel git commit: CAMEL-9602 - ProducerTemplateBuilder
Date Wed, 27 Apr 2016 08:15:36 GMT
Repository: camel
Updated Branches:
  refs/heads/master c66552d99 -> 473935b53


CAMEL-9602 - ProducerTemplateBuilder


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/473935b5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/473935b5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/473935b5

Branch: refs/heads/master
Commit: 473935b535c972b03562d2e4656bb294b5ce0a14
Parents: c66552d
Author: lburgazzoli <lburgazzoli@gmail.com>
Authored: Tue Mar 1 14:00:59 2016 +0100
Committer: lburgazzoli <lburgazzoli@gmail.com>
Committed: Wed Apr 27 10:14:58 2016 +0200

----------------------------------------------------------------------
 .../camel/builder/FluentProducerTemplate.java   | 351 +++++++++++++++++++
 .../builder/FluentProducerTemplateTest.java     | 291 +++++++++++++++
 2 files changed, 642 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/473935b5/camel-core/src/main/java/org/apache/camel/builder/FluentProducerTemplate.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/builder/FluentProducerTemplate.java
b/camel-core/src/main/java/org/apache/camel/builder/FluentProducerTemplate.java
new file mode 100644
index 0000000..bb8ae98
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/builder/FluentProducerTemplate.java
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
+
+public class FluentProducerTemplate {
+    private final CamelContext context;
+    private Map<String, Object> headers;
+    private Object body;
+    private Endpoint endpoint;
+    private Consumer<ProducerTemplate> templateCustomizer;
+    private Supplier<Exchange> exchangeSupplier;
+    private Supplier<Processor> processorSupplier;
+    private ProducerTemplate template;
+
+    public FluentProducerTemplate(CamelContext context) {
+        this.context = context;
+        this.headers = null;
+        this.body = null;
+        this.endpoint = null;
+        this.templateCustomizer = null;
+        this.exchangeSupplier = null;
+        this.processorSupplier = () -> this::populateExchange;
+        this.template = null;
+    }
+
+    /**
+     * Set the header
+     *
+     * @param key the key of the header
+     * @param value the value of the header
+     * @return ProducerTemplate builder
+     */
+    public FluentProducerTemplate withHeader(String key, Object value) {
+        if (headers == null) {
+            headers = new HashMap<>();
+        }
+
+        headers.put(key, value);
+
+        return this;
+    }
+
+    /**
+     * Remove the headers.
+     *
+     * @return ProducerTemplate builder
+     */
+    public FluentProducerTemplate clearHeaders() {
+        if (headers != null) {
+            headers.clear();
+        }
+
+        return this;
+    }
+
+    /**
+     * Set the message body
+     *
+     * @param body the body
+     * @return ProducerTemplate builder
+     */
+    public FluentProducerTemplate withBody(Object body) {
+        this.body = body;
+
+        return this;
+    }
+
+    /**
+     * Remove the body.
+     *
+     * @return ProducerTemplate builder
+     */
+    public FluentProducerTemplate clearBody() {
+        this.body = null;
+
+        return this;
+    }
+
+    /**
+     * To customize the producer template for advanced usage like to set the
+     * executor service to use.
+     *
+     * <pre>
+     * {@code
+     * FluentProducerTemplate.on(context)
+     *     .withTemplateCustomizer(
+     *         template -> {
+     *             template.setExecutorService(myExecutor);
+     *             template.setMaximumCacheSize(10);
+     *         }
+     *      )
+     *     .withBody("the body")
+     *     .to("direct:start")
+     *     .request()
+     * </pre>
+     *
+     * Note that it is invoked only once.
+     *
+     * @param templateCustomizer
+     * @return
+     */
+    public FluentProducerTemplate withTemplateCustomizer(final Consumer<ProducerTemplate>
templateCustomizer) {
+        this.templateCustomizer = templateCustomizer;
+        return this;
+    }
+
+    /**
+     * Set the exchange to use for send.
+     *
+     * @param exchange
+     * @return
+     */
+    public FluentProducerTemplate withExchange(final Exchange exchange) {
+        return withExchange(() -> exchange);
+    }
+
+    /**
+     * Set the exchangeSupplier which will be invoke to get the exchange to be
+     * used for send.
+     *
+     * @param exchangeSupplier
+     * @return
+     */
+    public FluentProducerTemplate withExchange(final Supplier<Exchange> exchangeSupplier)
{
+        this.exchangeSupplier = exchangeSupplier;
+        return this;
+    }
+
+    /**
+     * Set the processor to use for send/request.
+     *
+     * <pre>
+     * {@code
+     * FluentProducerTemplate.on(context)
+     *     .withProcessor(
+     *         exchange -> {
+     *             exchange.getIn().setHeader("Key1", "Val1")
+     *             exchange.getIn().setHeader("Key2", "Val2")
+     *             exchange.getIn().setBody("the body")
+     *         }
+     *      )
+     *     .to("direct:start")
+     *     .request()
+     * </pre>
+     *
+     * @param processor
+     * @return
+     */
+    public FluentProducerTemplate withProcessor(final Processor processor) {
+        return withProcessor(() -> processor);
+    }
+
+    /**
+     * Set the processorSupplier which will be invoke to get the processor to be
+     * used for send/request.
+     *
+     * @param processorSupplier
+     * @return
+     */
+    public FluentProducerTemplate withProcessor(final Supplier<Processor> processorSupplier)
{
+        this.processorSupplier = processorSupplier;
+        return this;
+    }
+
+    /**
+     * Set the message body
+     *
+     * @param endpointUri the endpoint URI to send to
+     * @return ProducerTemplate builder
+     */
+    public FluentProducerTemplate to(String endpointUri) {
+        return to(context.getEndpoint(endpointUri));
+    }
+
+    /**
+     * Set the message body
+     *
+     * @param endpoint the endpoint to send to
+     * @return ProducerTemplate builder
+     */
+    public FluentProducerTemplate to(Endpoint endpoint) {
+        this.endpoint = endpoint;
+        return this;
+    }
+
+    // ************************
+    // REQUEST
+    // ************************
+
+    /**
+     * Send to an endpoint returning any result output body.
+     *
+     * @return the result
+     * @throws CamelExecutionException
+     */
+    public Object request() throws CamelExecutionException {
+        return request(Object.class);
+    }
+
+    /**
+     * Send to an endpoint.
+     *
+     * @param type the expected response type
+     * @return the result
+     * @throws CamelExecutionException
+     */
+    @SuppressWarnings("unchecked")
+    public <T> T request(Class<T> type) throws CamelExecutionException {
+        T result;
+        if (type == Exchange.class) {
+            result = (T)template().request(endpoint, processorSupplier.get());
+        } else if (type == Message.class) {
+            Exchange exchange = template().request(endpoint, processorSupplier.get());
+            result = exchange.hasOut() ? (T)exchange.getOut() : (T)exchange.getIn();
+        } else {
+            Exchange exchange = template().send(endpoint, ExchangePattern.InOut, processorSupplier.get());
+            result = context.getTypeConverter().convertTo(
+                type,
+                ExchangeHelper.extractResultBody(exchange, exchange.getPattern())
+            );
+        }
+
+        return result;
+    }
+
+    /**
+     * Sends asynchronously to the given endpoint.
+     *
+     * @return a handle to be used to get the response in the future
+     */
+    public Future<Object> asyncRequest() {
+        return asyncRequest(Object.class);
+    }
+
+    /**
+     * Sends asynchronously to the given endpoint.
+     *
+     * @param type the expected response type
+     * @return a handle to be used to get the response in the future
+     */
+    public <T> Future<T> asyncRequest(Class<T> type) {
+        Future<T> result;
+        if (headers != null) {
+            result = template().asyncRequestBodyAndHeaders(endpoint, body, headers, type);
+        } else {
+            result = template().asyncRequestBody(endpoint, body, type);
+        }
+
+        return result;
+    }
+
+    // ************************
+    // SEND
+    // ************************
+
+    /**
+     * Send to an endpoint
+     *
+     * @throws CamelExecutionException
+     */
+    public Exchange send() throws CamelExecutionException {
+        Exchange result =  exchangeSupplier != null
+            ? template().send(endpoint, exchangeSupplier.get())
+            : template().send(endpoint, processorSupplier.get());
+
+        // TODO: validate
+        // must invoke extract result body in case of exception to be rethrown
+        //ExchangeHelper.extractResultBody(result, null);
+
+        return result;
+    }
+
+    /**
+     * Sends asynchronously to the given endpoint.
+     *
+     * @return a handle to be used to get the response in the future
+     */
+    public Future<Exchange> asyncSend() {
+        return exchangeSupplier != null
+            ? template().asyncSend(endpoint, exchangeSupplier.get())
+            : template().asyncSend(endpoint, processorSupplier.get());
+    }
+
+    // ************************
+    // HELPERS
+    // ************************
+
+    /**
+     * Create the FluentProducerTemplate by setting the camel context
+     *
+     * @param context the camel context
+     * @return ProducerTemplate builder
+     */
+    public static FluentProducerTemplate on(CamelContext context) {
+        return new FluentProducerTemplate(context);
+    }
+
+    private ProducerTemplate template() {
+        ObjectHelper.notNull(context, "camel-context");
+        ObjectHelper.notNull(endpoint, "endpoint");
+
+        if (this.template == null) {
+            template = context.createProducerTemplate();
+            if (templateCustomizer != null) {
+                templateCustomizer.accept(template);
+            }
+        }
+
+        return template;
+    }
+
+    private void populateExchange(Exchange exchange) throws Exception {
+        if (headers != null && !headers.isEmpty()) {
+            exchange.getIn().getHeaders().putAll(headers);
+        }
+        if (body != null) {
+            exchange.getIn().setBody(body);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/473935b5/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
b/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
new file mode 100644
index 0000000..27504f3
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/builder/FluentProducerTemplateTest.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Unit test for DefaultProducerTemplate
+ */
+public class FluentProducerTemplateTest extends ContextTestSupport {
+
+    public void testIn() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Bye World");
+
+        Object result = FluentProducerTemplate.on(context)
+            .withBody("Hello World")
+            .to("direct:in")
+            .request();
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals("Bye World", result);
+
+        assertSame(context, template.getCamelContext());
+    }
+
+    public void testInOut() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Bye Bye World");
+
+        Object result = FluentProducerTemplate.on(context)
+            .withBody("Hello World")
+            .to("direct:out")
+            .request();
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals("Bye Bye World", result);
+    }
+
+    public void testFault() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(0);
+
+        Object result = FluentProducerTemplate.on(context)
+            .withBody("Hello World")
+            .to("direct:fault")
+            .request();
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals("Faulty World", result);
+    }
+
+    // TODO: to review
+    public void testExceptionUsingBody() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(0);
+
+        Exchange out =  FluentProducerTemplate.on(context)
+            .withBody("Hello World")
+            .to("direct:exception")
+            .send();
+
+        assertTrue(out.isFailed());
+        assertTrue(out.getException() instanceof IllegalArgumentException);
+        assertEquals("Forced exception by unit test", out.getException().getMessage());
+
+        /*
+        try {
+            Exchange out =  FluentProducerTemplate.on(context)
+                .withBody("Hello World")
+                .to("direct:exception")
+                .send();
+
+            assertTrue(out.isFailed());
+            fail("Should have thrown RuntimeCamelException");
+        } catch (RuntimeCamelException e) {
+            assertTrue(e.getCause() instanceof IllegalArgumentException);
+            assertEquals("Forced exception by unit test", e.getCause().getMessage());
+        }
+        */
+
+        assertMockEndpointsSatisfied();
+    }
+
+    // TODO: to review
+    public void testExceptionUsingProcessor() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(0);
+
+        Exchange out = FluentProducerTemplate.on(context)
+            .withProcessor(exchange -> exchange.getIn().setBody("Hello World"))
+            .to("direct:exception")
+            .send();
+
+        assertTrue(out.isFailed());
+        assertEquals("Forced exception by unit test", out.getException().getMessage());
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testExceptionUsingExchange() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(0);
+
+        Exchange out = FluentProducerTemplate.on(context)
+            .withExchange(() -> {
+                Exchange exchange = context.getEndpoint("direct:exception").createExchange();
+                exchange.getIn().setBody("Hello World");
+
+                return exchange;
+            })
+            .to("direct:exception")
+            .send();
+
+        assertTrue(out.isFailed());
+        assertEquals("Forced exception by unit test", out.getException().getMessage());
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testRequestExceptionUsingBody() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(0);
+
+        try {
+            FluentProducerTemplate.on(context)
+                .withBody("Hello World")
+                .to("direct:exception")
+                .request();
+
+            fail("Should have thrown RuntimeCamelException");
+        } catch (RuntimeCamelException e) {
+            assertTrue(e.getCause() instanceof IllegalArgumentException);
+            assertEquals("Forced exception by unit test", e.getCause().getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testRequestExceptionUsingProcessor() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(0);
+
+        Exchange out = FluentProducerTemplate.on(context)
+            .withProcessor(exchange -> exchange.getIn().setBody("Hello World"))
+            .to("direct:exception")
+            .request(Exchange.class);
+
+        assertTrue(out.isFailed());
+        assertEquals("Forced exception by unit test", out.getException().getMessage());
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testRequestExceptionUsingExchange() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(0);
+
+        Exchange out = FluentProducerTemplate.on(context)
+            .withExchange(() -> {
+                Exchange exchange = context.getEndpoint("direct:exception").createExchange(ExchangePattern.InOut);
+                exchange.getIn().setBody("Hello World");
+
+                return exchange;
+            })
+            .to("direct:exception")
+            .send();
+
+        assertTrue(out.isFailed());
+        assertEquals("Forced exception by unit test", out.getException().getMessage());
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testRequestBody() throws Exception {
+        // with endpoint as string uri
+        FluentProducerTemplate template = FluentProducerTemplate.on(context);
+
+        final Integer expectedResult = new Integer(123);
+
+        assertEquals(
+            expectedResult,
+            template.clearBody()
+                .clearHeaders()
+                .withBody("Hello")
+                .to("direct:inout")
+                .request(Integer.class)
+        );
+
+        assertEquals(
+            expectedResult,
+            template.clearBody()
+                .clearHeaders()
+                .withHeader("foo", "bar")
+                .withBody("Hello")
+                .to("direct:inout")
+                .request(Integer.class)
+        );
+
+        assertEquals(
+            expectedResult,
+            template.clearBody()
+                .clearHeaders()
+                .withBody("Hello")
+                .to("direct:inout")
+                .request(Integer.class)
+        );
+
+        assertEquals(
+            expectedResult,
+            template.clearBody()
+                .clearHeaders()
+                .withBody("Hello")
+                .to(context.getEndpoint("direct:inout"))
+                .request(Integer.class)
+        );
+
+        assertEquals(
+            expectedResult,
+            template.clearBody()
+                .clearHeaders()
+                .withHeader("foo", "bar")
+                .withBody("Hello")
+                .to(context.getEndpoint("direct:inout"))
+                .request(Integer.class)
+        );
+
+        assertEquals(
+            expectedResult,
+            template.clearBody()
+                .clearHeaders()
+                .withBody("Hello")
+                .to(context.getEndpoint("direct:inout"))
+                .request(Integer.class)
+        );
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                // for faster unit test
+                errorHandler(noErrorHandler());
+
+                from("direct:in").process(
+                        exchange -> exchange.getIn().setBody("Bye World"))
+                    .to("mock:result");
+
+                from("direct:out").process(
+                        exchange -> exchange.getOut().setBody("Bye Bye World"))
+                    .to("mock:result");
+
+                from("direct:fault").process(
+                        exchange -> {
+                            exchange.getOut().setFault(true);
+                            exchange.getOut().setBody("Faulty World");
+                        })
+                    .to("mock:result");
+
+                from("direct:exception").process(
+                        exchange -> {
+                            throw new IllegalArgumentException("Forced exception by unit
test");
+                        })
+                    .to("mock:result");
+
+                from("direct:inout").transform(constant(123));
+            }
+        };
+    }
+}


Mime
View raw message