camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject camel git commit: CAMEL-9239: camel-sjms - Add completionInterval to batch consumer. Polished the code and fixed some mistakes.
Date Mon, 22 Feb 2016 08:20:52 GMT
Repository: camel
Updated Branches:
  refs/heads/master f97ac2787 -> ee84c1816


CAMEL-9239: camel-sjms - Add completionInterval to batch consumer. Polished the code and fixed
some mistakes.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ee84c181
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ee84c181
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ee84c181

Branch: refs/heads/master
Commit: ee84c1816213dab372cfb2efe336ea6ec926d2a3
Parents: f97ac27
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Mon Feb 22 09:20:06 2016 +0100
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Mon Feb 22 09:20:06 2016 +0100

----------------------------------------------------------------------
 .../component/sjms/batch/SjmsBatchConsumer.java | 29 ++++++++++++++------
 1 file changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ee84c181/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index b5d72a2..4ab7437 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -16,7 +16,9 @@
  */
 package org.apache.camel.component.sjms.batch;
 
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -57,7 +59,6 @@ public class SjmsBatchConsumer extends DefaultConsumer {
 
     private ScheduledExecutorService timeoutCheckerExecutorService;
     private boolean shutdownTimeoutCheckerExecutorService;
-    private final AtomicBoolean completionTimeoutTrigger = new AtomicBoolean();
 
     private final SjmsBatchEndpoint sjmsBatchEndpoint;
     private final AggregationStrategy aggregationStrategy;
@@ -134,10 +135,13 @@ public class SjmsBatchConsumer extends DefaultConsumer {
         }
         consumersShutdownLatchRef.set(new CountDownLatch(consumerCount));
 
-        jmsConsumerExecutors = getEndpoint().getCamelContext().getExecutorServiceManager()
-                .newFixedThreadPool(this, "SjmsBatchConsumer", consumerCount);
+        jmsConsumerExecutors = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(this,
"SjmsBatchConsumer", consumerCount);
+
+        final List<AtomicBoolean> triggers = new ArrayList<>();
         for (int i = 0; i < consumerCount; i++) {
-            jmsConsumerExecutors.execute(new BatchConsumptionLoop());
+            BatchConsumptionLoop loop = new BatchConsumptionLoop();
+            triggers.add(loop.getCompletionTimeoutTrigger());
+            jmsConsumerExecutors.execute(loop);
         }
 
         if (completionInterval > 0) {
@@ -147,7 +151,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                 shutdownTimeoutCheckerExecutorService = true;
             }
             // trigger completion based on interval
-            timeoutCheckerExecutorService.scheduleAtFixedRate(new CompletionIntervalTask(completionTimeoutTrigger),
completionInterval, completionInterval, TimeUnit.MILLISECONDS);
+            timeoutCheckerExecutorService.scheduleAtFixedRate(new CompletionIntervalTask(triggers),
completionInterval, completionInterval, TimeUnit.MILLISECONDS);
         }
 
     }
@@ -190,10 +194,10 @@ public class SjmsBatchConsumer extends DefaultConsumer {
      */
     private final class CompletionIntervalTask implements Runnable {
 
-        private final AtomicBoolean timeoutInterval;
+        private final List<AtomicBoolean> triggers;
 
-        public CompletionIntervalTask(AtomicBoolean timeoutInterval) {
-            this.timeoutInterval = timeoutInterval;
+        public CompletionIntervalTask(List<AtomicBoolean> triggers) {
+            this.triggers = triggers;
         }
 
         public void run() {
@@ -204,14 +208,21 @@ public class SjmsBatchConsumer extends DefaultConsumer {
             }
 
             // signal
-            timeoutInterval.set(true);
+            for (AtomicBoolean trigger : triggers) {
+                trigger.set(true);
+            }
         }
     }
 
     private class BatchConsumptionLoop implements Runnable {
 
+        private final AtomicBoolean completionTimeoutTrigger = new AtomicBoolean();
         private final BatchConsumptionTask task = new BatchConsumptionTask(completionTimeoutTrigger);
 
+        public AtomicBoolean getCompletionTimeoutTrigger() {
+            return completionTimeoutTrigger;
+        }
+
         @Override
         public void run() {
             try {


Mime
View raw message