camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [camel] branch master updated: CAMEL-14591: Fixed thread pool for recipinent list EIP should only be created when really needed (timeout enabled) and that the pool is also shutdown.
Date Wed, 19 Feb 2020 11:02:21 GMT
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 340407b  CAMEL-14591: Fixed thread pool for recipinent list EIP should only be created
when really needed (timeout enabled) and that the pool is also shutdown.
340407b is described below

commit 340407b2556bf3e58178b6d1c749e393a12b6898
Author: Claus Ibsen <claus.ibsen@gmail.com>
AuthorDate: Wed Feb 19 12:01:49 2020 +0100

    CAMEL-14591: Fixed thread pool for recipinent list EIP should only be created when really
needed (timeout enabled) and that the pool is also shutdown.
---
 .../apache/camel/processor/MulticastProcessor.java | 16 +++++++++++++--
 .../org/apache/camel/processor/RecipientList.java  | 23 +++++++++++-----------
 .../java/org/apache/camel/processor/Splitter.java  |  1 -
 .../camel/processor/RecipientListNoCacheTest.java  |  5 +++++
 .../RecipientListParallelTimeoutTest.java          | 22 ++++++++++++++++++++-
 5 files changed, 51 insertions(+), 16 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index dd2ae22..ccd9d01 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -161,6 +161,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements
Navigat
     private final ExecutorService executorService;
     private final boolean shutdownExecutorService;
     private ExecutorService aggregateExecutorService;
+    private boolean shutdownAggregateExecutorService;
     private final long timeout;
     private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers = new
ConcurrentHashMap<>();
     private final boolean shareUnitOfWork;
@@ -802,12 +803,13 @@ public class MulticastProcessor extends AsyncProcessorSupport implements
Navigat
         if (isParallelProcessing() && executorService == null) {
             throw new IllegalArgumentException("ParallelProcessing is enabled but ExecutorService
has not been set");
         }
-        if (aggregateExecutorService == null) {
+        if (timeout > 0 && aggregateExecutorService == null) {
             // use unbounded thread pool so we ensure the aggregate on-the-fly task always
will have assigned a thread
             // and run the tasks when the task is submitted. If not then the aggregate task
may not be able to run
             // and signal completion during processing, which would lead to what would appear
as a dead-lock or a slow processing
             String name = getClass().getSimpleName() + "-AggregateTask";
             aggregateExecutorService = createAggregateExecutorService(name);
+            shutdownAggregateExecutorService = true;
         }
         if (aggregationStrategy instanceof CamelContextAware) {
             ((CamelContextAware) aggregationStrategy).setCamelContext(camelContext);
@@ -842,7 +844,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements
Navigat
         if (shutdownExecutorService && executorService != null) {
             getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
         }
-        if (aggregateExecutorService != null) {
+        if (shutdownAggregateExecutorService && aggregateExecutorService != null)
{
             getCamelContext().getExecutorServiceManager().shutdownNow(aggregateExecutorService);
         }
     }
@@ -968,6 +970,16 @@ public class MulticastProcessor extends AsyncProcessorSupport implements
Navigat
         return shareUnitOfWork;
     }
 
+    public ExecutorService getAggregateExecutorService() {
+        return aggregateExecutorService;
+    }
+
+    public void setAggregateExecutorService(ExecutorService aggregateExecutorService) {
+        this.aggregateExecutorService = aggregateExecutorService;
+        // we use a custom executor so do not shutdown
+        this.shutdownAggregateExecutorService = false;
+    }
+
     @Override
     public List<Processor> next() {
         if (!hasNext()) {
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
index e913ffd..4903429 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -72,7 +72,7 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware,
Rou
     private boolean shareUnitOfWork;
     private ExecutorService executorService;
     private boolean shutdownExecutorService;
-    private ExecutorService aggregateExecutorService;
+    private volatile ExecutorService aggregateExecutorService;
     private AggregationStrategy aggregationStrategy = new UseLatestAggregationStrategy();
 
     public RecipientList(CamelContext camelContext) {
@@ -190,21 +190,13 @@ public class RecipientList extends AsyncProcessorSupport implements
IdAware, Rou
         RecipientListProcessor rlp = new RecipientListProcessor(exchange.getContext(), producerCache,
iter, getAggregationStrategy(),
                 isParallelProcessing(), getExecutorService(), isShutdownExecutorService(),
                 isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork(),
isParallelAggregate(),
-                isStopOnAggregateException()) {
-            @Override
-            protected synchronized ExecutorService createAggregateExecutorService(String
name) {
-                // use a shared executor service to avoid creating new thread pools
-                if (aggregateExecutorService == null) {
-                    aggregateExecutorService = super.createAggregateExecutorService("RecipientList-AggregateTask");
-                }
-                return aggregateExecutorService;
-            }
-        };
+                isStopOnAggregateException());
+        rlp.setAggregateExecutorService(aggregateExecutorService);
         rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints());
         rlp.setId(getId());
         rlp.setRouteId(getRouteId());
 
-        // start the service
+        // start ourselves
         try {
             ServiceHelper.startService(rlp);
         } catch (Exception e) {
@@ -232,6 +224,10 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware,
Rou
                 LOG.debug("RecipientList {} using ProducerCache with cacheSize={}", this,
cacheSize);
             }
         }
+        if (timeout > 0) {
+            // use a cached thread pool so we each on-the-fly task has a dedicated thread
to process completions as they come in
+            aggregateExecutorService = camelContext.getExecutorServiceManager().newScheduledThreadPool(this,
"RecipientList-AggregateTask", 0);
+        }
         ServiceHelper.startService(aggregationStrategy, producerCache);
     }
 
@@ -244,6 +240,9 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware,
Rou
     protected void doShutdown() throws Exception {
         ServiceHelper.stopAndShutdownServices(producerCache, aggregationStrategy);
 
+        if (aggregateExecutorService != null) {
+            camelContext.getExecutorServiceManager().shutdownNow(aggregateExecutorService);
+        }
         if (shutdownExecutorService && executorService != null) {
             camelContext.getExecutorServiceManager().shutdownNow(executorService);
         }
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java b/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java
index 0f09d12..491358c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java
@@ -81,7 +81,6 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor,
Trac
     public boolean process(Exchange exchange, final AsyncCallback callback) {
         AggregationStrategy strategy = getAggregationStrategy();
 
-
         // set original exchange if not already pre-configured
         if (strategy instanceof UseOriginalAggregationStrategy) {
             // need to create a new private instance, as we can also have concurrency issue
so we cannot store state
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
index 2660405..f4a9575 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
@@ -17,6 +17,7 @@
 package org.apache.camel.processor;
 
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Processor;
@@ -52,6 +53,10 @@ public class RecipientListNoCacheTest extends ContextTestSupport {
         Object pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("producerCache"),
rl);
         assertNotNull(pc);
         assertIsInstanceOf(EmptyProducerCache.class, pc);
+
+        // and no thread pool is in use as timeout is 0
+        pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("aggregateExecutorService"),
rl);
+        assertNull(pc);
     }
 
     protected void sendBody(String body) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelTimeoutTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelTimeoutTest.java
index 84458dd..b93b50d 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelTimeoutTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelTimeoutTest.java
@@ -16,11 +16,16 @@
  */
 package org.apache.camel.processor;
 
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
 import org.apache.camel.AggregationStrategy;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.util.ReflectionHelper;
 import org.junit.Test;
 
 public class RecipientListParallelTimeoutTest extends ContextTestSupport {
@@ -34,6 +39,21 @@ public class RecipientListParallelTimeoutTest extends ContextTestSupport
{
         template.sendBodyAndHeader("direct:start", "Hello", "slip", "direct:a,direct:b,direct:c");
 
         assertMockEndpointsSatisfied();
+
+        // make sure that the thread pool will be shutdown
+        List<Processor> list = context.getRoute("route1").filter("foo");
+        RecipientList rl = (RecipientList) list.get(0);
+        assertNotNull(rl);
+
+        Object pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("aggregateExecutorService"),
rl);
+        assertNotNull(pc);
+        ExecutorService es = assertIsInstanceOf(ExecutorService.class, pc);
+
+        assertFalse(es.isShutdown());
+
+        // now stop camel and ensure the thread pool is stopped
+        context.stop();
+        assertTrue(es.isShutdown());
     }
 
     @Override
@@ -51,7 +71,7 @@ public class RecipientListParallelTimeoutTest extends ContextTestSupport
{
                         oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
                         return oldExchange;
                     }
-                }).parallelProcessing().timeout(500).to("mock:result");
+                }).parallelProcessing().timeout(500).id("foo").to("mock:result");
 
                 from("direct:a").delay(1000).setBody(constant("A"));
 


Mime
View raw message