Return-Path: Delivered-To: apmail-camel-commits-archive@www.apache.org Received: (qmail 93774 invoked from network); 4 Apr 2010 13:48:11 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 4 Apr 2010 13:48:11 -0000 Received: (qmail 6630 invoked by uid 500); 4 Apr 2010 13:48:11 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 6600 invoked by uid 500); 4 Apr 2010 13:48:11 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 6593 invoked by uid 99); 4 Apr 2010 13:48:11 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 04 Apr 2010 13:48:11 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 04 Apr 2010 13:48:05 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 0FA3C2388A32; Sun, 4 Apr 2010 13:47:43 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r930692 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/builder/ camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/main/java/org/apache/camel/processor/aggregat... Date: Sun, 04 Apr 2010 13:47:42 -0000 To: commits@camel.apache.org From: davsclaus@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100404134743.0FA3C2388A32@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: davsclaus Date: Sun Apr 4 13:47:42 2010 New Revision: 930692 URL: http://svn.apache.org/viewvc?rev=930692&view=rev Log: CAMEL-2568: Preparing for redelivery policy for aggregator. Added: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java - copied, changed from r930663, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.java (with props) camel/trunk/components/camel-hawtdb/src/test/resources/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.xml - copied, changed from r930635, camel/trunk/components/camel-hawtdb/src/test/resources/org/apache/camel/component/hawtdb/HawtDBSpringAggregateTest.xml Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategyUsingOnlyWhenTest.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java?rev=930692&r1=930691&r2=930692&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java Sun Apr 4 13:47:42 2010 @@ -387,7 +387,7 @@ public class DefaultErrorHandlerBuilder protected RedeliveryPolicy createRedeliveryPolicy() { RedeliveryPolicy policy = new RedeliveryPolicy(); policy.disableRedelivery(); - policy.setRedeliverDelay(0); + policy.setRedeliveryDelay(0); return policy; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java?rev=930692&r1=930691&r2=930692&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java Sun Apr 4 13:47:42 2010 @@ -102,7 +102,7 @@ public class OnExceptionDefinition exten public String toString() { return "OnException[" + getExceptionClasses() + (onWhen != null ? " " + onWhen : "") + " -> " + getOutputs() + "]"; } - + @Override public boolean isAbstract() { return true; @@ -110,10 +110,11 @@ public class OnExceptionDefinition exten /** * Allows an exception handler to create a new redelivery policy for this exception type - * @param context the camel context + * + * @param context the camel context * @param parentPolicy the current redelivery policy * @return a newly created redelivery policy, or return the original policy if no customization is required - * for this exception handler. + * for this exception handler. */ public RedeliveryPolicy createRedeliveryPolicy(CamelContext context, RedeliveryPolicy parentPolicy) { if (redeliveryPolicy != null) { @@ -171,29 +172,29 @@ public class OnExceptionDefinition exten /** * Sets whether the exchange should be marked as handled or not. * - * @param handled handled or not + * @param handled handled or not * @return the builder */ public OnExceptionDefinition handled(boolean handled) { Expression expression = ExpressionBuilder.constantExpression(Boolean.toString(handled)); return handled(expression); } - + /** * Sets whether the exchange should be marked as handled or not. * - * @param handled predicate that determines true or false + * @param handled predicate that determines true or false * @return the builder */ public OnExceptionDefinition handled(Predicate handled) { setHandledPolicy(handled); return this; } - + /** * Sets whether the exchange should be marked as handled or not. * - * @param handled expression that determines true or false + * @param handled expression that determines true or false * @return the builder */ public OnExceptionDefinition handled(Expression handled) { @@ -207,7 +208,7 @@ public class OnExceptionDefinition exten * To be used for fine grained controlling whether a thrown exception should be intercepted * by this exception type or not. * - * @param predicate predicate that determines true or false + * @param predicate predicate that determines true or false * @return the builder */ public OnExceptionDefinition onWhen(Predicate predicate) { @@ -254,11 +255,13 @@ public class OnExceptionDefinition exten } /** - * Sets the delay + * Sets the initial redelivery delay * - * @param delay the redeliver delay + * @param delay the initial redelivery delay * @return the builder + * @deprecated */ + @Deprecated public OnExceptionDefinition redeliverDelay(long delay) { getOrCreateRedeliveryPolicy().redeliveryDelay(delay); return this; @@ -267,7 +270,7 @@ public class OnExceptionDefinition exten /** * Sets the back off multiplier * - * @param backOffMultiplier the back off multiplier + * @param backOffMultiplier the back off multiplier * @return the builder */ public OnExceptionDefinition backOffMultiplier(double backOffMultiplier) { @@ -278,7 +281,7 @@ public class OnExceptionDefinition exten /** * Sets the collision avoidance factor * - * @param collisionAvoidanceFactor the factor + * @param collisionAvoidanceFactor the factor * @return the builder */ public OnExceptionDefinition collisionAvoidanceFactor(double collisionAvoidanceFactor) { @@ -289,7 +292,7 @@ public class OnExceptionDefinition exten /** * Sets the collision avoidance percentage * - * @param collisionAvoidancePercent the percentage + * @param collisionAvoidancePercent the percentage * @return the builder */ public OnExceptionDefinition collisionAvoidancePercent(double collisionAvoidancePercent) { @@ -298,9 +301,9 @@ public class OnExceptionDefinition exten } /** - * Sets the fixed delay between redeliveries + * Sets the initial redelivery delay * - * @param delay delay in millis + * @param delay delay in millis * @return the builder */ public OnExceptionDefinition redeliveryDelay(long delay) { @@ -311,7 +314,7 @@ public class OnExceptionDefinition exten /** * Sets the logging level to use when retries has exhausted * - * @param retriesExhaustedLogLevel the logging level + * @param retriesExhaustedLogLevel the logging level * @return the builder */ public OnExceptionDefinition retriesExhaustedLogLevel(LoggingLevel retriesExhaustedLogLevel) { @@ -322,7 +325,7 @@ public class OnExceptionDefinition exten /** * Sets the logging level to use for logging retry attempts * - * @param retryAttemptedLogLevel the logging level + * @param retryAttemptedLogLevel the logging level * @return the builder */ public OnExceptionDefinition retryAttemptedLogLevel(LoggingLevel retryAttemptedLogLevel) { @@ -373,12 +376,12 @@ public class OnExceptionDefinition exten /** * Sets the maximum redeliveries *
    - *
  • 5 = default value
  • - *
  • 0 = no redeliveries
  • - *
  • -1 = redeliver forever
  • + *
  • 5 = default value
  • + *
  • 0 = no redeliveries
  • + *
  • -1 = redeliver forever
  • *
* - * @param maximumRedeliveries the value + * @param maximumRedeliveries the value * @return the builder */ public OnExceptionDefinition maximumRedeliveries(int maximumRedeliveries) { @@ -409,7 +412,7 @@ public class OnExceptionDefinition exten /** * Sets the maximum delay between redelivery * - * @param maximumRedeliveryDelay the delay in millis + * @param maximumRedeliveryDelay the delay in millis * @return the builder */ public OnExceptionDefinition maximumRedeliveryDelay(long maximumRedeliveryDelay) { @@ -418,6 +421,28 @@ public class OnExceptionDefinition exten } /** + * Sets a reference to a {@link RedeliveryPolicy} to lookup in the {@link org.apache.camel.spi.Registry} to be used. + * + * @param redeliveryPolicyRef reference to use for lookup + * @return the builder + */ + public OnExceptionDefinition redeliveryPolicyRef(String redeliveryPolicyRef) { + getOrCreateRedeliveryPolicy().setRef(redeliveryPolicyRef); + return this; + } + + /** + * Sets the delay pattern with delay intervals. + * + * @param delayPattern the delay pattern + * @return the builder + */ + public OnExceptionDefinition delayPattern(String delayPattern) { + getOrCreateRedeliveryPolicy().setDelayPattern(delayPattern); + return this; + } + + /** * Will use the original input body when an {@link org.apache.camel.Exchange} is moved to the dead letter queue. *

* Notice: this only applies when all redeliveries attempt have failed and the {@link org.apache.camel.Exchange} is doomed for failure. @@ -438,7 +463,7 @@ public class OnExceptionDefinition exten } /** - * Sets a processor that should be processed before a redelivey attempt. + * Sets a processor that should be processed before a redelivery attempt. *

* Can be used to change the {@link org.apache.camel.Exchange} before its being redelivered. */ @@ -449,6 +474,7 @@ public class OnExceptionDefinition exten // Properties //------------------------------------------------------------------------- + public List getOutputs() { return outputs; } @@ -498,7 +524,7 @@ public class OnExceptionDefinition exten public ExpressionSubElementDefinition getHandled() { return handled; - } + } public void setHandledPolicy(Predicate handledPolicy) { this.handledPolicy = handledPolicy; @@ -554,6 +580,7 @@ public class OnExceptionDefinition exten // Implementation methods //------------------------------------------------------------------------- + protected RedeliveryPolicyDefinition getOrCreateRedeliveryPolicy() { if (redeliveryPolicy == null) { redeliveryPolicy = new RedeliveryPolicyDefinition(); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java?rev=930692&r1=930691&r2=930692&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java Sun Apr 4 13:47:42 2010 @@ -66,6 +66,8 @@ public class RedeliveryPolicyDefinition private Boolean logExhausted; @XmlAttribute private Boolean disableRedelivery; + @XmlAttribute + private String delayPattern; public RedeliveryPolicy createRedeliveryPolicy(CamelContext context, RedeliveryPolicy parentPolicy) { if (ref != null) { @@ -73,14 +75,19 @@ public class RedeliveryPolicyDefinition return CamelContextHelper.mandatoryLookup(context, ref, RedeliveryPolicy.class); } - RedeliveryPolicy answer = parentPolicy.copy(); + RedeliveryPolicy answer; + if (parentPolicy != null) { + answer = parentPolicy.copy(); + } else { + answer = new RedeliveryPolicy(); + } // copy across the properties - if they are set if (maximumRedeliveries != null) { answer.setMaximumRedeliveries(maximumRedeliveries); } if (redeliveryDelay != null) { - answer.setRedeliverDelay(redeliveryDelay); + answer.setRedeliveryDelay(redeliveryDelay); } if (retriesExhaustedLogLevel != null) { answer.setRetriesExhaustedLogLevel(retriesExhaustedLogLevel); @@ -121,6 +128,9 @@ public class RedeliveryPolicyDefinition if (disableRedelivery != null && disableRedelivery) { answer.setMaximumRedeliveries(0); } + if (delayPattern != null) { + answer.setDelayPattern(delayPattern); + } return answer; } @@ -260,7 +270,7 @@ public class RedeliveryPolicyDefinition /** * Sets the maximum redeliveries *

    - *
  • 5 = default value
  • + *
  • x = redeliver at most x times
  • *
  • 0 = no redeliveries
  • *
  • -1 = redeliver forever
  • *
@@ -315,6 +325,17 @@ public class RedeliveryPolicyDefinition return this; } + /** + * Sets the delay pattern with delay intervals. + * + * @param delayPattern the delay pattern + * @return the builder + */ + public RedeliveryPolicyDefinition delayPattern(String delayPattern) { + setDelayPattern(delayPattern); + return this; + } + // Properties //------------------------------------------------------------------------- @@ -445,4 +466,12 @@ public class RedeliveryPolicyDefinition public void setLogExhausted(Boolean logExhausted) { this.logExhausted = logExhausted; } + + public String getDelayPattern() { + return delayPattern; + } + + public void setDelayPattern(String delayPattern) { + this.delayPattern = delayPattern; + } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java?rev=930692&r1=930691&r2=930692&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java Sun Apr 4 13:47:42 2010 @@ -149,7 +149,9 @@ public class RedeliveryPolicy implements /** - * Calculates the new redelivery delay based on the last one then sleeps for the necessary amount of time + * Calculates the new redelivery delay based on the last one and then sleeps for the necessary amount of time. + *

+ * This implementation will block while sleeping. * * @param redeliveryDelay previous redelivery delay * @param redeliveryCounter number of previous redelivery attempts @@ -168,7 +170,14 @@ public class RedeliveryPolicy implements return redeliveryDelay; } - protected long calculateRedeliveryDelay(long previousDelay, int redeliveryCounter) { + /** + * Calculates the new redelivery delay based on the last one + * + * @param previousDelay previous redelivery delay + * @param redeliveryCounter number of previous redelivery attempts + * @return the calculate delay + */ + public long calculateRedeliveryDelay(long previousDelay, int redeliveryCounter) { if (ObjectHelper.isNotEmpty(delayPattern)) { // calculate delay using the pattern return calculateRedeliverDelayUsingPattern(delayPattern, redeliveryCounter); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=930692&r1=930691&r2=930692&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Sun Apr 4 13:47:42 2010 @@ -37,11 +37,12 @@ import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.impl.LoggingExceptionHandler; import org.apache.camel.impl.ServiceSupport; -import org.apache.camel.impl.SynchronizationAdapter; +import org.apache.camel.processor.RedeliveryPolicy; import org.apache.camel.processor.Traceable; import org.apache.camel.spi.AggregationRepository; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.RecoverableAggregationRepository; +import org.apache.camel.spi.Synchronization; import org.apache.camel.util.DefaultTimeoutMap; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.LRUCache; @@ -328,7 +329,7 @@ public class AggregateProcessor extends LOG.debug("Aggregation complete for correlation key " + key + " sending aggregated exchange: " + exchange); } - // add this as in progress + // add this as in progress before we submit the task inProgressCompleteExchanges.add(exchange.getExchangeId()); // send this exchange @@ -461,17 +462,18 @@ public class AggregateProcessor extends /** * On completion task which keeps the booking of the in progress up to date */ - private class AggregateOnCompletion extends SynchronizationAdapter { + private class AggregateOnCompletion implements Synchronization { - @Override - public void onDone(Exchange exchange) { - // must remember to remove when we are done (done = success or failure) + public void onFailure(Exchange exchange) { + // must remember to remove in progress when we failed inProgressCompleteExchanges.remove(exchange.getExchangeId()); + // do not remove redelivery state as we need it when we redeliver again later } - @Override public void onComplete(Exchange exchange) { - // remove redelivery state when it was processed successfully + // must remember to remove in progress when we are complete + inProgressCompleteExchanges.remove(exchange.getExchangeId()); + // and remove redelivery state as well redeliveryState.remove(exchange.getExchangeId()); } @@ -556,9 +558,6 @@ public class AggregateProcessor extends } data.redeliveryCounter++; - // TODO: support delay and have a DelayQueue to avoid blocking - // if so we need to pre add in progress so we wont add again to delay queue - // set redelivery counter exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter); Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategyUsingOnlyWhenTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategyUsingOnlyWhenTest.java?rev=930692&r1=930691&r2=930692&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategyUsingOnlyWhenTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategyUsingOnlyWhenTest.java Sun Apr 4 13:47:42 2010 @@ -68,7 +68,7 @@ public class DefaultExceptionPolicyStrat errorHandler(deadLetterChannel(ERROR_QUEUE).maximumRedeliveries(0).redeliveryDelay(100)); onException(MyUserException.class).onWhen(header("user").isNotNull()) - .maximumRedeliveries(1).backOffMultiplier(2).redeliverDelay(0) + .maximumRedeliveries(1).backOffMultiplier(2).redeliveryDelay(0) .to(ERROR_USER_QUEUE); from("direct:a").process(new Processor() { Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java?rev=930692&r1=930691&r2=930692&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java Sun Apr 4 13:47:42 2010 @@ -48,6 +48,9 @@ public class HawtDBAggregateNotLostRemov assertMockEndpointsSatisfied(); + // sleep a bit since the completed signal is done async + Thread.sleep(2000); + String exchangeId = getMockEndpoint("mock:result").getReceivedExchanges().get(0).getExchangeId(); // the exchange should NOT be in the completed repo as it was confirmed Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java?rev=930692&r1=930691&r2=930692&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java Sun Apr 4 13:47:42 2010 @@ -49,6 +49,9 @@ public class HawtDBAggregateNotLostTest assertMockEndpointsSatisfied(); + // sleep a bit since the completed signal is done async + Thread.sleep(2000); + String exchangeId = getMockEndpoint("mock:aggregated").getReceivedExchanges().get(0).getExchangeId(); // the exchange should be in the completed repo where we should be able to find it Copied: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java (from r930663, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java&r1=930663&r2=930692&rev=930692&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java Sun Apr 4 13:47:42 2010 @@ -26,7 +26,7 @@ import org.apache.camel.processor.aggreg import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; -public class HawtDBAggregateRecoverTest extends CamelTestSupport { +public class HawtDBAggregateRecoverWithRedeliveryPolicyTest extends CamelTestSupport { private HawtDBAggregationRepository repo; private static AtomicInteger counter = new AtomicInteger(0); @@ -44,13 +44,16 @@ public class HawtDBAggregateRecoverTest @Test public void testHawtDBAggregateRecover() throws Exception { - // should fail the first 2 times and then recover - getMockEndpoint("mock:aggregated").expectedMessageCount(3); + getMockEndpoint("mock:aggregated").setResultWaitTime(20000); + getMockEndpoint("mock:result").setResultWaitTime(20000); + + // should fail the first 3 times and then recover + getMockEndpoint("mock:aggregated").expectedMessageCount(4); getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE"); // should be marked as redelivered getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE); // on the 2nd redelivery attempt we success - getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2); + getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3); template.sendBodyAndHeader("direct:start", "A", "id", 123); template.sendBodyAndHeader("direct:start", "B", "id", 123); @@ -69,14 +72,14 @@ public class HawtDBAggregateRecoverTest from("direct:start") .aggregate(header("id"), new MyAggregationStrategy()) .completionSize(5).aggregationRepository(repo) + // this is the output from the aggregator .log("aggregated exchange id ${exchangeId} with ${body}") .to("mock:aggregated") - .delay(2000) - // simulate errors the first two times + // simulate errors the first three times .process(new Processor() { public void process(Exchange exchange) throws Exception { int count = counter.incrementAndGet(); - if (count <= 2) { + if (count <= 3) { throw new IllegalArgumentException("Damn"); } } Added: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.java?rev=930692&view=auto ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.java (added) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.java Sun Apr 4 13:47:42 2010 @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hawtdb; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.apache.camel.test.junit4.CamelSpringTestSupport; +import org.junit.Test; +import org.springframework.context.support.AbstractXmlApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +public class HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest extends CamelSpringTestSupport { + + private static AtomicInteger counter = new AtomicInteger(0); + + @Override + protected AbstractXmlApplicationContext createApplicationContext() { + return new ClassPathXmlApplicationContext("org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.xml"); + } + + @Override + public void setUp() throws Exception { + deleteDirectory("target/data"); + super.setUp(); + } + + @Test + public void testHawtDBAggregateRecover() throws Exception { + getMockEndpoint("mock:aggregated").setResultWaitTime(20000); + getMockEndpoint("mock:result").setResultWaitTime(20000); + + // should fail the first 3 times and then recover + getMockEndpoint("mock:aggregated").expectedMessageCount(4); + getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE"); + // should be marked as redelivered + getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE); + // on the 2nd redelivery attempt we success + getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3); + + template.sendBodyAndHeader("direct:start", "A", "id", 123); + template.sendBodyAndHeader("direct:start", "B", "id", 123); + template.sendBodyAndHeader("direct:start", "C", "id", 123); + template.sendBodyAndHeader("direct:start", "D", "id", 123); + template.sendBodyAndHeader("direct:start", "E", "id", 123); + + assertMockEndpointsSatisfied(); + } + + public static class MyFailProcessor implements Processor { + + public void process(Exchange exchange) throws Exception { + int count = counter.incrementAndGet(); + if (count <= 3) { + throw new IllegalArgumentException("Damn"); + } + } + } + + public static class MyAggregationStrategy implements AggregationStrategy { + + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (oldExchange == null) { + return newExchange; + } + String body1 = oldExchange.getIn().getBody(String.class); + String body2 = newExchange.getIn().getBody(String.class); + + oldExchange.getIn().setBody(body1 + body2); + return oldExchange; + } + } +} \ No newline at end of file Propchange: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Copied: camel/trunk/components/camel-hawtdb/src/test/resources/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.xml (from r930635, camel/trunk/components/camel-hawtdb/src/test/resources/org/apache/camel/component/hawtdb/HawtDBSpringAggregateTest.xml) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/resources/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.xml?p2=camel/trunk/components/camel-hawtdb/src/test/resources/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.xml&p1=camel/trunk/components/camel-hawtdb/src/test/resources/org/apache/camel/component/hawtdb/HawtDBSpringAggregateTest.xml&r1=930635&r2=930692&rev=930692&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/resources/org/apache/camel/component/hawtdb/HawtDBSpringAggregateTest.xml (original) +++ camel/trunk/components/camel-hawtdb/src/test/resources/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.xml Sun Apr 4 13:47:42 2010 @@ -30,10 +30,17 @@ + + + + - + + + + @@ -44,8 +51,12 @@

id
- + + + + +