camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1442135 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/processor/ main/java/org/apache/camel/support/ test/java/org/apache/camel/processor/ test/resources/
Date Mon, 04 Feb 2013 14:32:04 GMT
Author: davsclaus
Date: Mon Feb  4 14:32:04 2013
New Revision: 1442135

URL: http://svn.apache.org/viewvc?rev=1442135&view=rev
Log:
CAMEL-6003: Using allowRedeliveryWhileStopping=false on Dead Letter Channel now moves the
message to the DLC (when stopping) instead of rejecting the message.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryDeadLetterErrorHandlerNoRedeliveryOnShutdownTest.java
    camel/trunk/camel-core/src/test/resources/log4j.properties

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=1442135&r1=1442134&r2=1442135&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
Mon Feb  4 14:32:04 2013
@@ -79,4 +79,11 @@ public class DeadLetterChannel extends R
         // DeadLetterChannel handles errors before sending to DLQ
         return ExpressionToPredicateAdapter.toPredicate(ExpressionBuilder.constantExpression(true));
     }
+
+    @Override
+    protected boolean isRunAllowedOnPreparingShutdown() {
+        // allow tu run as we want to move the message eto DLC, instead of rejecting the
message
+        return true;
+    }
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=1442135&r1=1442134&r2=1442135&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
Mon Feb  4 14:32:04 2013
@@ -214,18 +214,46 @@ public abstract class RedeliveryErrorHan
                 log.trace("isRunAllowed() -> true (Run allowed as RedeliverWhileStopping
is enabled)");
                 return true;
             } else if (preparingShutdown) {
-                // do not allow redelivery as we are preparing for shutdown
-                log.trace("isRunAllowed() -> false (Run not allowed as we are preparing
for shutdown)");
-                return false;
+                // we are preparing for shutdown, now determine if we can still run
+                boolean answer = isRunAllowedOnPreparingShutdown();
+                log.trace("isRunAllowed() -> {} (Run not allowed as we are preparing for
shutdown)", answer);
+                return answer;
             }
         }
 
-        // fallback and use code from super
-        boolean answer = super.isRunAllowed();
+        // we cannot run if we are stopping/stopped
+        boolean answer = !isStoppingOrStopped();
         log.trace("isRunAllowed() -> {} (Run allowed if we are not stopped/stopping)",
answer);
         return answer;
     }
 
+    protected boolean isRunAllowedOnPreparingShutdown() {
+        return false;
+    }
+
+    protected boolean isRedeliveryAllowed(RedeliveryData data) {
+        // redelivery policy can control if redelivery is allowed during stopping/shutdown
+        // but this only applies during a redelivery (counter must > 0)
+        if (data.redeliveryCounter > 0) {
+            boolean stopping = isStoppingOrStopped();
+            if (!preparingShutdown && !stopping) {
+                log.trace("isRedeliveryAllowed() -> true (we are not stopping/stopped)");
+                return true;
+            } else {
+                // we are stopping or preparing to shutdown
+                if (data.currentRedeliveryPolicy.allowRedeliveryWhileStopping) {
+                    log.trace("isRedeliveryAllowed() -> true (Redelivery allowed as RedeliverWhileStopping
is enabled)");
+                    return true;
+                } else {
+                    log.trace("isRedeliveryAllowed() -> false (Redelivery not allowed
as RedeliverWhileStopping is disabled)");
+                    return false;
+                }
+            }
+        }
+
+        return true;
+    }
+
     @Override
     public void prepareShutdown(boolean forced) {
         // prepare for shutdown, eg do not allow redelivery if configured
@@ -274,9 +302,12 @@ public abstract class RedeliveryErrorHan
                 handleException(exchange, data);
             }
 
-            // compute if we are exhausted or not
+            // compute if we are exhausted, and whether redelivery is allowed
             boolean exhausted = isExhausted(exchange, data);
-            if (exhausted) {
+            boolean redeliverAllowed = isRedeliveryAllowed(data);
+
+            // if we are exhausted or redelivery is not allowed, then deliver to failure
processor (eg such as DLC)
+            if (!redeliverAllowed || exhausted) {
                 Processor target = null;
                 boolean deliver = true;
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java?rev=1442135&r1=1442134&r2=1442135&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java Mon
Feb  4 14:32:04 2013
@@ -219,7 +219,11 @@ public abstract class ServiceSupport imp
 
     @Override
     public boolean isRunAllowed() {
-        return !(stopping.get() || stopped.get());
+        return !isStoppingOrStopped();
+    }
+
+    public boolean isStoppingOrStopped() {
+        return stopping.get() || stopped.get();
     }
 
     /**

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryDeadLetterErrorHandlerNoRedeliveryOnShutdownTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryDeadLetterErrorHandlerNoRedeliveryOnShutdownTest.java?rev=1442135&r1=1442134&r2=1442135&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryDeadLetterErrorHandlerNoRedeliveryOnShutdownTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryDeadLetterErrorHandlerNoRedeliveryOnShutdownTest.java
Mon Feb  4 14:32:04 2013
@@ -16,18 +16,23 @@
  */
 package org.apache.camel.processor;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.util.StopWatch;
-import org.junit.Ignore;
 
-@Ignore
 public class RedeliveryDeadLetterErrorHandlerNoRedeliveryOnShutdownTest extends ContextTestSupport
{
 
+    private final AtomicInteger counter = new AtomicInteger();
+
     public void testRedeliveryErrorHandlerNoRedeliveryOnShutdown() throws Exception {
         getMockEndpoint("mock:foo").expectedMessageCount(1);
         getMockEndpoint("mock:deadLetter").expectedMessageCount(1);
+        getMockEndpoint("mock:deadLetter").setResultWaitTime(25000);
 
         template.sendBody("seda:foo", "Hello World");
 
@@ -35,13 +40,26 @@ public class RedeliveryDeadLetterErrorHa
 
         // should not take long to stop the route
         StopWatch watch = new StopWatch();
+        // sleep 3 seconds to do some redeliveries before we stop
+        Thread.sleep(3000);
+        log.info("==== stopping route foo ====");
         context.stopRoute("foo");
         watch.stop();
 
-        getMockEndpoint("mock:deadLetter").setResultWaitTime(25000);
         getMockEndpoint("mock:deadLetter").assertIsSatisfied();
 
-        assertTrue("Should stop route faster, was " + watch.taken(), watch.taken() < 4000);
+        log.info("OnRedelivery processor counter {}", counter.get());
+
+        assertTrue("Should stop route faster, was " + watch.taken(), watch.taken() < 7000);
+        assertTrue("Redelivery counter should be >= 2 and < 12, was: " + counter.get(),
counter.get() >= 2 && counter.get() < 12);
+    }
+
+    private final class MyRedeliverProcessor implements Processor {
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            counter.incrementAndGet();
+        }
     }
 
     @Override
@@ -51,6 +69,7 @@ public class RedeliveryDeadLetterErrorHa
             public void configure() throws Exception {
                 errorHandler(deadLetterChannel("mock:deadLetter")
                         .allowRedeliveryWhileStopping(false)
+                        .onRedelivery(new MyRedeliverProcessor())
                         .maximumRedeliveries(20).redeliveryDelay(1000).retryAttemptedLogLevel(LoggingLevel.INFO));
 
                 from("seda:foo").routeId("foo")

Modified: camel/trunk/camel-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=1442135&r1=1442134&r2=1442135&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/resources/log4j.properties (original)
+++ camel/trunk/camel-core/src/test/resources/log4j.properties Mon Feb  4 14:32:04 2013
@@ -37,6 +37,7 @@ log4j.logger.org.apache.camel.impl.Defau
 #log4j.logger.org.apache.camel.component.mock=DEBUG
 #log4j.logger.org.apache.camel.component.file=TRACE
 #log4j.logger.org.apache.camel.processor.DefaultErrorHandler=TRACE
+#log4j.logger.org.apache.camel.processor.DeadLetterChannel=TRACE
 #log4j.logger.org.apache.camel.processor.Pipeline=TRACE
 #log4j.logger.org.apache.camel.processor.MulticastProcessor=TRACE
 #log4j.logger.org.apache.camel.processor.RecipientList=TRACE



Mime
View raw message