camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject camel git commit: CAMEL-11351: Optimize - ProducerCache - Avoid creating new processor for each message sent.
Date Sun, 28 May 2017 10:05:12 GMT
Repository: camel
Updated Branches:
  refs/heads/master 72727a5e6 -> 30917aba7


CAMEL-11351: Optimize - ProducerCache - Avoid creating new processor for each message sent.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/30917aba
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/30917aba
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/30917aba

Branch: refs/heads/master
Commit: 30917aba7d23be2a408660f3e4dd503a9cad5015
Parents: 72727a5
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Sun May 28 11:16:25 2017 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Sun May 28 11:16:25 2017 +0200

----------------------------------------------------------------------
 .../org/apache/camel/impl/ProducerCache.java    |  38 +--
 .../processor/SharedCamelInternalProcessor.java | 286 +++++++++++++++++++
 .../camel/processor/SimpleMockSampleTest.java   |  71 +++++
 3 files changed, 371 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/30917aba/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
index 574f7cd..285f1ab 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.impl;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -34,7 +32,7 @@ import org.apache.camel.Producer;
 import org.apache.camel.ProducerCallback;
 import org.apache.camel.ServicePoolAware;
 import org.apache.camel.processor.CamelInternalProcessor;
-import org.apache.camel.processor.Pipeline;
+import org.apache.camel.processor.SharedCamelInternalProcessor;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.ServicePool;
 import org.apache.camel.support.ServiceSupport;
@@ -59,6 +57,7 @@ public class ProducerCache extends ServiceSupport {
     private final ServicePool<Endpoint, Producer> pool;
     private final Map<String, Producer> producers;
     private final Object source;
+    private final SharedCamelInternalProcessor internalProcessor;
 
     private EndpointUtilizationStatistics statistics;
     private boolean eventNotifierEnabled = true;
@@ -100,6 +99,9 @@ public class ProducerCache extends ServiceSupport {
         } else {
             this.extendedStatistics = false;
         }
+
+        // internal processor used for sending
+        internalProcessor = new SharedCamelInternalProcessor(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
     }
 
     public boolean isEventNotifierEnabled() {
@@ -486,9 +488,9 @@ public class ProducerCache extends ServiceSupport {
             if (eventNotifierEnabled) {
                 callback = new EventNotifierCallback(callback, exchange, endpoint);
             }
-            CamelInternalProcessor internal = prepareInternalProcessor(producer, resultProcessor);
-
-            return internal.process(exchange, callback);
+            AsyncProcessor target = prepareProducer(producer);
+            // invoke the asynchronous method
+            return internalProcessor.process(exchange, callback, target, resultProcessor);
         } catch (Throwable e) {
             // ensure exceptions is caught and set on the exchange
             exchange.setException(e);
@@ -533,8 +535,10 @@ public class ProducerCache extends ServiceSupport {
                         }
                     }
 
-                    CamelInternalProcessor internal = prepareInternalProcessor(producer,
resultProcessor);
-                    internal.process(exchange);
+                    AsyncProcessor target = prepareProducer(producer);
+                    // invoke the synchronous method
+                    internalProcessor.process(exchange, target, resultProcessor);
+
                 } catch (Throwable e) {
                     // ensure exceptions is caught and set on the exchange
                     exchange.setException(e);
@@ -550,22 +554,8 @@ public class ProducerCache extends ServiceSupport {
         });
     }
 
-    protected CamelInternalProcessor prepareInternalProcessor(Producer producer, Processor
resultProcessor) {
-        // if we have a result processor then wrap in pipeline to execute both of them in
sequence
-        Processor target;
-        if (resultProcessor != null) {
-            List<Processor> processors = new ArrayList<Processor>(2);
-            processors.add(producer);
-            processors.add(resultProcessor);
-            target = Pipeline.newInstance(getCamelContext(), processors);
-        } else {
-            target = producer;
-        }
-
-        // wrap in unit of work
-        CamelInternalProcessor internal = new CamelInternalProcessor(target);
-        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
-        return internal;
+    protected AsyncProcessor prepareProducer(Producer producer) {
+        return AsyncProcessorConverterHelper.convert(producer);
     }
 
     protected synchronized Producer doGetProducer(Endpoint endpoint, boolean pooled) {

http://git-wip-us.apache.org/repos/asf/camel/blob/30917aba/camel-core/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
b/camel-core/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
new file mode 100644
index 0000000..3dc4086
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.Ordered;
+import org.apache.camel.Processor;
+import org.apache.camel.Service;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.spi.RoutePolicy;
+import org.apache.camel.spi.Transformer;
+import org.apache.camel.spi.UnitOfWork;
+import org.apache.camel.util.OrderedComparator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Shared (thread safe) internal {@link Processor} that Camel routing engine used during
routing for cross cutting functionality such as:
+ * <ul>
+ *     <li>Execute {@link UnitOfWork}</li>
+ *     <li>Keeping track which route currently is being routed</li>
+ *     <li>Execute {@link RoutePolicy}</li>
+ *     <li>Gather JMX performance statics</li>
+ *     <li>Tracing</li>
+ *     <li>Debugging</li>
+ *     <li>Message History</li>
+ *     <li>Stream Caching</li>
+ *     <li>{@link Transformer}</li>
+ * </ul>
+ * ... and more.
+ * <p/>
+ * This implementation executes this cross cutting functionality as a {@link CamelInternalProcessorAdvice}
advice (before and after advice)
+ * by executing the {@link CamelInternalProcessorAdvice#before(Exchange)} and
+ * {@link CamelInternalProcessorAdvice#after(Exchange, Object)} callbacks in correct order
during routing.
+ * This reduces number of stack frames needed during routing, and reduce the number of lines
in stacktraces, as well
+ * makes debugging the routing engine easier for end users.
+ * <p/>
+ * <b>Debugging tips:</b> Camel end users whom want to debug their Camel applications
with the Camel source code, then make sure to
+ * read the source code of this class about the debugging tips, which you can find in the
+ * {@link #process(Exchange, AsyncCallback, AsyncProcessor, Processor)} method.
+ * <p/>
+ * The added advices can implement {@link Ordered} to control in which order the advices
are executed.
+ */
+public class SharedCamelInternalProcessor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SharedCamelInternalProcessor.class);
+    private final List<CamelInternalProcessorAdvice> advices = new ArrayList<CamelInternalProcessorAdvice>();
+
+    public SharedCamelInternalProcessor(CamelInternalProcessorAdvice... advices) {
+        if (advices != null) {
+            this.advices.addAll(Arrays.asList(advices));
+            // ensure advices are sorted so they are in the order we want
+            this.advices.sort(OrderedComparator.get());
+        }
+    }
+
+    /**
+     * Synchronous API
+     */
+    public void process(Exchange exchange, AsyncProcessor processor, Processor resultProcessor)
{
+        final AsyncProcessorAwaitManager awaitManager = exchange.getContext().getAsyncProcessorAwaitManager();
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        boolean sync = process(exchange, new AsyncCallback() {
+            public void done(boolean doneSync) {
+                if (!doneSync) {
+                    awaitManager.countDown(exchange, latch);
+                }
+            }
+
+            @Override
+            public String toString() {
+                return "Done " + processor;
+            }
+        }, processor, resultProcessor);
+
+        if (!sync) {
+            awaitManager.await(exchange, latch);
+        }
+    }
+
+    /**
+     * Asynchronous API
+     */
+    public boolean process(Exchange exchange, AsyncCallback callback, AsyncProcessor processor,
Processor resultProcessor) {
+        // ----------------------------------------------------------
+        // CAMEL END USER - READ ME FOR DEBUGGING TIPS
+        // ----------------------------------------------------------
+        // If you want to debug the Camel routing engine, then there is a lot of internal
functionality
+        // the routing engine executes during routing messages. You can skip debugging this
internal
+        // functionality and instead debug where the routing engine continues routing to
the next node
+        // in the routes. The CamelInternalProcessor is a vital part of the routing engine,
as its
+        // being used in between the nodes. As an end user you can just debug the code in
this class
+        // in between the:
+        //   CAMEL END USER - DEBUG ME HERE +++ START +++
+        //   CAMEL END USER - DEBUG ME HERE +++ END +++
+        // you can see in the code below.
+        // ----------------------------------------------------------
+
+        if (processor == null || !continueProcessing(exchange, processor)) {
+            // no processor or we should not continue then we are done
+            callback.done(true);
+            return true;
+        }
+
+        final List<Object> states = new ArrayList<Object>(advices.size());
+        for (CamelInternalProcessorAdvice task : advices) {
+            try {
+                Object state = task.before(exchange);
+                states.add(state);
+            } catch (Throwable e) {
+                exchange.setException(e);
+                callback.done(true);
+                return true;
+            }
+        }
+
+        // create internal callback which will execute the advices in reverse order when
done
+        callback = new InternalCallback(states, exchange, callback, resultProcessor);
+
+        // UNIT_OF_WORK_PROCESS_SYNC is @deprecated and we should remove it from Camel 3.0
+        Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC);
+        if (exchange.isTransacted() || synchronous != null) {
+            // must be synchronized for transacted exchanges
+            if (LOG.isTraceEnabled()) {
+                if (exchange.isTransacted()) {
+                    LOG.trace("Transacted Exchange must be routed synchronously for exchangeId:
{} -> {}", exchange.getExchangeId(), exchange);
+                } else {
+                    LOG.trace("Synchronous UnitOfWork Exchange must be routed synchronously
for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
+                }
+            }
+            // ----------------------------------------------------------
+            // CAMEL END USER - DEBUG ME HERE +++ START +++
+            // ----------------------------------------------------------
+            try {
+                processor.process(exchange);
+            } catch (Throwable e) {
+                exchange.setException(e);
+            }
+            // ----------------------------------------------------------
+            // CAMEL END USER - DEBUG ME HERE +++ END +++
+            // ----------------------------------------------------------
+            callback.done(true);
+            return true;
+        } else {
+            final UnitOfWork uow = exchange.getUnitOfWork();
+
+            // allow unit of work to wrap callback in case it need to do some special work
+            // for example the MDCUnitOfWork
+            AsyncCallback async = callback;
+            if (uow != null) {
+                async = uow.beforeProcess(processor, exchange, callback);
+            }
+
+            // ----------------------------------------------------------
+            // CAMEL END USER - DEBUG ME HERE +++ START +++
+            // ----------------------------------------------------------
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Processing exchange for exchangeId: {} -> {}", exchange.getExchangeId(),
exchange);
+            }
+            boolean sync = processor.process(exchange, async);
+            // ----------------------------------------------------------
+            // CAMEL END USER - DEBUG ME HERE +++ END +++
+            // ----------------------------------------------------------
+
+            // execute any after processor work (in current thread, not in the callback)
+            if (uow != null) {
+                uow.afterProcess(processor, exchange, callback, sync);
+            }
+
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Exchange processed and is continued routed {} for exchangeId:
{} -> {}",
+                        new Object[]{sync ? "synchronously" : "asynchronously", exchange.getExchangeId(),
exchange});
+            }
+            return sync;
+        }
+    }
+
+    /**
+     * Internal callback that executes the after advices.
+     */
+    private final class InternalCallback implements AsyncCallback {
+
+        private final List<Object> states;
+        private final Exchange exchange;
+        private final AsyncCallback callback;
+        private final Processor resultProcessor;
+
+        private InternalCallback(List<Object> states, Exchange exchange, AsyncCallback
callback, Processor resultProcessor) {
+            this.states = states;
+            this.exchange = exchange;
+            this.callback = callback;
+            this.resultProcessor = resultProcessor;
+        }
+
+        @Override
+        public void done(boolean doneSync) {
+            // NOTE: if you are debugging Camel routes, then all the code in the for loop
below is internal only
+            // so you can step straight to the finally block and invoke the callback
+
+            if (resultProcessor != null) {
+                try {
+                    resultProcessor.process(exchange);
+                } catch (Throwable e) {
+                    exchange.setException(e);
+                }
+            }
+
+            // we should call after in reverse order
+            try {
+                for (int i = advices.size() - 1; i >= 0; i--) {
+                    CamelInternalProcessorAdvice task = advices.get(i);
+                    Object state = states.get(i);
+                    try {
+                        task.after(exchange, state);
+                    } catch (Throwable e) {
+                        exchange.setException(e);
+                        // allow all advices to complete even if there was an exception
+                    }
+                }
+            } finally {
+                // ----------------------------------------------------------
+                // CAMEL END USER - DEBUG ME HERE +++ START +++
+                // ----------------------------------------------------------
+                // callback must be called
+                callback.done(doneSync);
+                // ----------------------------------------------------------
+                // CAMEL END USER - DEBUG ME HERE +++ END +++
+                // ----------------------------------------------------------
+            }
+        }
+    }
+
+    /**
+     * Strategy to determine if we should continue processing the {@link Exchange}.
+     */
+    protected boolean continueProcessing(Exchange exchange, AsyncProcessor processor) {
+        Object stop = exchange.getProperty(Exchange.ROUTE_STOP);
+        if (stop != null) {
+            boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class,
stop);
+            if (doStop) {
+                LOG.debug("Exchange is marked to stop routing: {}", exchange);
+                return false;
+            }
+        }
+
+        // determine if we can still run, or the camel context is forcing a shutdown
+        if (processor instanceof Service) {
+            boolean forceShutdown = exchange.getContext().getShutdownStrategy().forceShutdown((Service)
processor);
+            if (forceShutdown) {
+                String msg = "Run not allowed as ShutdownStrategy is forcing shutting down,
will reject executing exchange: " + exchange;
+                LOG.debug(msg);
+                if (exchange.getException() == null) {
+                    exchange.setException(new RejectedExecutionException(msg));
+                }
+                return false;
+            }
+        }
+
+        // yes we can continue
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/30917aba/camel-core/src/test/java/org/apache/camel/processor/SimpleMockSampleTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/SimpleMockSampleTest.java
b/camel-core/src/test/java/org/apache/camel/processor/SimpleMockSampleTest.java
new file mode 100644
index 0000000..e90c347
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/SimpleMockSampleTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+
+/**
+ * @version 
+ */
+public class SimpleMockSampleTest extends ContextTestSupport {
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        // use simple register which is faster than jndi
+        CamelContext context = new DefaultCamelContext(new SimpleRegistry());
+        // can be used to optimise camel
+        context.getRuntimeEndpointRegistry().setEnabled(false);
+        return context;
+    }
+
+    public void testSimpleThousandMessages() throws Exception {
+        int count = 1000;
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(count);
+
+        for (int i = 0; i < count; i++) {
+            template.sendBody("direct:start", "Hello World");
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testSimpleTwoMessages() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World", "Bye World");
+
+        template.sendBody("direct:start", "Hello World");
+        template.sendBody("direct:start", "Bye World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("log:foo").to("log:bar").to("mock:result");
+            }
+        };
+    }
+}


Mime
View raw message