camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject camel git commit: CAMEL-11515: Add option to configure how frequent to run the timeout checker in the aggregate EIP. Also add better docs about completion timeout.
Date Wed, 05 Jul 2017 19:26:18 GMT
Repository: camel
Updated Branches:
  refs/heads/master abea861b4 -> d4e01d7a6


CAMEL-11515: Add option to configure how frequent to run the timeout checker in the aggregate
EIP. Also add better docs about completion timeout.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d4e01d7a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d4e01d7a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d4e01d7a

Branch: refs/heads/master
Commit: d4e01d7a6c1af9ddd21e8774a75932529bc6c9ab
Parents: abea861
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Wed Jul 5 21:23:14 2017 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Wed Jul 5 21:23:14 2017 +0200

----------------------------------------------------------------------
 .../src/main/docs/eips/aggregate-eip.adoc       |  7 ++--
 .../mbean/ManagedAggregateProcessorMBean.java   |  3 ++
 .../mbean/ManagedAggregateProcessor.java        |  4 +++
 .../apache/camel/model/AggregateDefinition.java | 38 ++++++++++++++++++++
 .../processor/aggregate/AggregateProcessor.java | 12 +++++--
 5 files changed, 59 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d4e01d7a/camel-core/src/main/docs/eips/aggregate-eip.adoc
----------------------------------------------------------------------
diff --git a/camel-core/src/main/docs/eips/aggregate-eip.adoc b/camel-core/src/main/docs/eips/aggregate-eip.adoc
index 19c5d9d..5cdbb64 100644
--- a/camel-core/src/main/docs/eips/aggregate-eip.adoc
+++ b/camel-core/src/main/docs/eips/aggregate-eip.adoc
@@ -19,7 +19,7 @@ single correlation key into a single message exchange.
 ### Aggregator options
 
 // eip options: START
-The Aggregate EIP supports 25 options which are listed below:
+The Aggregate EIP supports 26 options which are listed below:
 
 
 [width="100%",cols="3,1m,6",options="header"]
@@ -27,7 +27,7 @@ The Aggregate EIP supports 25 options which are listed below:
 | Name | Java Type | Description
 | correlationExpression | NamespaceAwareExpression | *Required* The expression used to calculate
the correlation key to use for aggregation. The Exchange which has the same correlation key
is aggregated together. If the correlation key could not be evaluated an Exception is thrown.
You can disable this by using the ignoreBadCorrelationKeys option.
 | completionPredicate | NamespaceAwareExpression | A Predicate to indicate when an aggregated
exchange is complete. If this is not specified and the AggregationStrategy object implements
Predicate the aggregationStrategy object will be used as the completionPredicate.
-| completionTimeout | NamespaceAwareExpression | Time in millis that an aggregated exchange
should be inactive before its complete (timeout). This option can be set as either a fixed
value or using an Expression which allows you to evaluate a timeout dynamically - will use
Long as result. If both are set Camel will fallback to use the fixed value if the Expression
result was null or 0. You cannot use this option together with completionInterval only one
of the two can be used.
+| completionTimeout | NamespaceAwareExpression | Time in millis that an aggregated exchange
should be inactive before its complete (timeout). This option can be set as either a fixed
value or using an Expression which allows you to evaluate a timeout dynamically - will use
Long as result. If both are set Camel will fallback to use the fixed value if the Expression
result was null or 0. You cannot use this option together with completionInterval only one
of the two can be used. By default the timeout checker runs every second you can use the completionTimeoutCheckerInterval
option to configure how frequently to run the checker. The timeout is an approximation and
there is no guarantee that the a timeout is triggered exactly after the timeout value. It
is not recommended to use very low timeout values or checker intervals.
 | completionSize | NamespaceAwareExpression | Number of messages aggregated before the aggregation
is complete. This option can be set as either a fixed value or using an Expression which allows
you to evaluate a size dynamically - will use Integer as result. If both are set Camel will
fallback to use the fixed value if the Expression result was null or 0.
 | optimisticLockRetryPolicy | OptimisticLockRetryPolicyDefinition | Allows to configure retry
settings when using optimistic locking.
 | parallelProcessing | Boolean | When aggregated are completed they are being send out of
the aggregator. This option indicates whether or not Camel should use a thread pool with multiple
threads for concurrency. If no custom thread pool has been specified then Camel creates a
default pool with 10 concurrent threads.
@@ -40,7 +40,8 @@ The Aggregate EIP supports 25 options which are listed below:
 | strategyMethodAllowNull | Boolean | If this option is false then the aggregate method is
not used for the very first aggregation. If this option is true then null values is used as
the oldExchange (at the very first aggregation) when using POJOs as the AggregationStrategy.
 | completionSize | Integer | Number of messages aggregated before the aggregation is complete.
This option can be set as either a fixed value or using an Expression which allows you to
evaluate a size dynamically - will use Integer as result. If both are set Camel will fallback
to use the fixed value if the Expression result was null or 0.
 | completionInterval | Long | A repeating period in millis by which the aggregator will complete
all current aggregated exchanges. Camel has a background task which is triggered every period.
You cannot use this option together with completionTimeout only one of them can be used.
-| completionTimeout | Long | Time in millis that an aggregated exchange should be inactive
before its complete (timeout). This option can be set as either a fixed value or using an
Expression which allows you to evaluate a timeout dynamically - will use Long as result. If
both are set Camel will fallback to use the fixed value if the Expression result was null
or 0. You cannot use this option together with completionInterval only one of the two can
be used.
+| completionTimeout | Long | Time in millis that an aggregated exchange should be inactive
before its complete (timeout). This option can be set as either a fixed value or using an
Expression which allows you to evaluate a timeout dynamically - will use Long as result. If
both are set Camel will fallback to use the fixed value if the Expression result was null
or 0. You cannot use this option together with completionInterval only one of the two can
be used. By default the timeout checker runs every second you can use the completionTimeoutCheckerInterval
option to configure how frequently to run the checker. The timeout is an approximation and
there is no guarantee that the a timeout is triggered exactly after the timeout value. It
is not recommended to use very low timeout values or checker intervals.
+| completionTimeoutCheckerInterval | Long | Interval in millis that is used by the background
task that checks for timeouts (org.apache.camel.TimeoutMap). By default the timeout checker
runs every second. The timeout is an approximation and there is no guarantee that the a timeout
is triggered exactly after the timeout value. It is not recommended to use very low timeout
values or checker intervals.
 | completionFromBatchConsumer | Boolean | Enables the batch completion mode where we aggregate
from a org.apache.camel.BatchConsumer and aggregate the total number of exchanges the org.apache.camel.BatchConsumer
has reported as total by checking the exchange property link org.apache.camel.ExchangeBATCH_COMPLETE
when its complete.
 | groupExchanges | Boolean | Enables grouped exchanges so the aggregator will group all aggregated
exchanges into a single combined Exchange holding all the aggregated exchanges in a java.util.List.
 | eagerCheckCompletion | Boolean | Use eager completion checking which means that the completionPredicate
will use the incoming Exchange. As opposed to without eager completion checking the completionPredicate
will use the aggregated Exchange.

http://git-wip-us.apache.org/repos/asf/camel/blob/d4e01d7a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
index 4944422..fa978c5 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
@@ -39,6 +39,9 @@ public interface ManagedAggregateProcessorMBean extends ManagedProcessorMBean
{
     @ManagedAttribute(description = "Completion interval in millis")
     long getCompletionInterval();
 
+    @ManagedAttribute(description = "Completion timeout checker interval in millis")
+    long getCompletionTimeoutCheckerInterval();
+
     @ManagedAttribute(description = "Completion size")
     int getCompletionSize();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/d4e01d7a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
index e4c167e..1f68652 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
@@ -90,6 +90,10 @@ public class ManagedAggregateProcessor extends ManagedProcessor implements
Manag
         return processor.getCompletionInterval();
     }
 
+    public long getCompletionTimeoutCheckerInterval() {
+        return processor.getCompletionTimeoutCheckerInterval();
+    }
+
     public int getCompletionSize() {
         return processor.getCompletionSize();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/d4e01d7a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index d5044a0..86b4032 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -103,6 +103,8 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
     private Long completionInterval;
     @XmlAttribute
     private Long completionTimeout;
+    @XmlAttribute @Metadata(defaultValue = "1000")
+    private Long completionTimeoutCheckerInterval = 1000L;
     @XmlAttribute
     private Boolean completionFromBatchConsumer;
     @XmlAttribute
@@ -283,6 +285,9 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
         if (getAggregateController() != null) {
             answer.setAggregateController(getAggregateController());
         }
+        if (getCompletionTimeoutCheckerInterval() != null) {
+            answer.setCompletionTimeoutCheckerInterval(getCompletionTimeoutCheckerInterval());
+        }
         return answer;
     }
 
@@ -488,6 +493,14 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
         this.completionTimeout = completionTimeout;
     }
 
+    public Long getCompletionTimeoutCheckerInterval() {
+        return completionTimeoutCheckerInterval;
+    }
+
+    public void setCompletionTimeoutCheckerInterval(Long completionTimeoutCheckerInterval)
{
+        this.completionTimeoutCheckerInterval = completionTimeoutCheckerInterval;
+    }
+
     public ExpressionSubElementDefinition getCompletionPredicate() {
         return completionPredicate;
     }
@@ -768,6 +781,11 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
      * a timeout dynamically - will use Long as result.
      * If both are set Camel will fallback to use the fixed value if the Expression result
was null or 0.
      * You cannot use this option together with completionInterval, only one of the two can
be used.
+     * <p/>
+     * By default the timeout checker runs every second, you can use the completionTimeoutCheckerInterval
option
+     * to configure how frequently to run the checker.
+     * The timeout is an approximation and there is no guarantee that the a timeout is triggered
exactly after the timeout value.
+     * It is not recommended to use very low timeout values or checker intervals.
      *
      * @param completionTimeout  the timeout in millis, must be a positive value
      * @return the builder
@@ -783,6 +801,11 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
      * a timeout dynamically - will use Long as result.
      * If both are set Camel will fallback to use the fixed value if the Expression result
was null or 0.
      * You cannot use this option together with completionInterval, only one of the two can
be used.
+     * <p/>
+     * By default the timeout checker runs every second, you can use the completionTimeoutCheckerInterval
option
+     * to configure how frequently to run the checker.
+     * The timeout is an approximation and there is no guarantee that the a timeout is triggered
exactly after the timeout value.
+     * It is not recommended to use very low timeout values or checker intervals.
      *
      * @param completionTimeout  the timeout as an {@link Expression} which is evaluated
as a {@link Long} type
      * @return the builder
@@ -793,6 +816,21 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
     }
 
     /**
+     * Interval in millis that is used by the background task that checks for timeouts ({@link
org.apache.camel.TimeoutMap}).
+     * <p/>
+     * By default the timeout checker runs every second.
+     * The timeout is an approximation and there is no guarantee that the a timeout is triggered
exactly after the timeout value.
+     * It is not recommended to use very low timeout values or checker intervals.
+     *
+     * @param completionTimeoutCheckerInterval  the interval in millis, must be a positive
value
+     * @return the builder
+     */
+    public AggregateDefinition completionTimeoutCheckerInterval(long completionTimeoutCheckerInterval)
{
+        setCompletionTimeoutCheckerInterval(completionTimeoutCheckerInterval);
+        return this;
+    }
+
+    /**
      * Sets the AggregationStrategy to use with a fluent builder.
      */
     public AggregationStrategyClause<AggregateDefinition> aggregationStrategy() {

http://git-wip-us.apache.org/repos/asf/camel/blob/d4e01d7a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 53d048d..50978a0 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -61,7 +61,6 @@ import org.apache.camel.support.LoggingExceptionHandler;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ExchangeHelper;
-import org.apache.camel.util.LRUCache;
 import org.apache.camel.util.LRUCacheFactory;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
@@ -210,6 +209,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
     private boolean discardOnCompletionTimeout;
     private boolean forceCompletionOnStop;
     private boolean completeAllOnStop;
+    private long completionTimeoutCheckerInterval = 1000;
 
     private ProducerTemplate deadLetterProducerTemplate;
 
@@ -933,6 +933,14 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         return completeAllOnStop;
     }
 
+    public long getCompletionTimeoutCheckerInterval() {
+        return completionTimeoutCheckerInterval;
+    }
+
+    public void setCompletionTimeoutCheckerInterval(long completionTimeoutCheckerInterval)
{
+        this.completionTimeoutCheckerInterval = completionTimeoutCheckerInterval;
+    }
+
     public ExceptionHandler getExceptionHandler() {
         return exceptionHandler;
     }
@@ -1383,7 +1391,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
                 shutdownTimeoutCheckerExecutorService = true;
             }
             // check for timed out aggregated messages once every second
-            timeoutMap = new AggregationTimeoutMap(getTimeoutCheckerExecutorService(), 1000L);
+            timeoutMap = new AggregationTimeoutMap(getTimeoutCheckerExecutorService(), getCompletionTimeoutCheckerInterval());
             // fill in existing timeout values from the aggregation repository, for example
if a restart occurred, then we
             // need to re-establish the timeout map so timeout can trigger
             restoreTimeoutMapFromAggregationRepository();


Mime
View raw message