camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r674036 - in /activemq/camel/trunk: camel-core/src/main/java/org/apache/camel/processor/ components/camel-spring/src/main/java/org/apache/camel/spring/spi/
Date Fri, 04 Jul 2008 12:55:07 GMT
Author: jstrachan
Date: Fri Jul  4 05:55:06 2008
New Revision: 674036

URL: http://svn.apache.org/viewvc?rev=674036&view=rev
Log:
transaction error handler for https://issues.apache.org/activemq/browse/CAMEL-663

Added:
    activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java
  (with props)
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionInterceptor.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=674036&r1=674035&r2=674036&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
Fri Jul  4 05:55:06 2008
@@ -146,8 +146,7 @@
 
             if (data.redeliveryCounter > 0) {
                 // Figure out how long we should wait to resend this message.
-                data.redeliveryDelay = data.currentRedeliveryPolicy.getRedeliveryDelay(data.redeliveryDelay);
-                sleep(data.redeliveryDelay);
+                data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
             }
 
             exchange.setProperty(EXCEPTION_CAUSE_PROPERTY, exchange.getException());
@@ -265,21 +264,6 @@
         return next;
     }
 
-    protected void sleep(long redeliveryDelay) {
-        if (redeliveryDelay > 0) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Sleeping for: " + redeliveryDelay + " millis until attempting
redelivery");
-            }
-            try {
-                Thread.sleep(redeliveryDelay);
-            } catch (InterruptedException e) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Thread interrupted: " + e, e);
-                }
-            }
-        }
-    }
-
     @Override
     protected void doStart() throws Exception {
         ServiceHelper.startServices(output, deadLetter);

Added: activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java?rev=674036&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java
(added)
+++ activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java
Fri Jul  4 05:55:06 2008
@@ -0,0 +1,76 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.spi;
+
+import org.apache.camel.builder.ErrorHandlerBuilderSupport;
+import org.apache.camel.builder.ErrorHandlerBuilder;
+import org.apache.camel.Processor;
+import org.apache.camel.processor.RedeliveryPolicy;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.ObjectHelper;
+import org.springframework.transaction.support.TransactionTemplate;
+import org.springframework.beans.factory.InitializingBean;
+
+/**
+ * An error handler which will roll the exception back if there is an error
+ * rather than using the dead letter channel and retry logic.
+ *
+ * A delay is also used after a rollback 
+ *
+ * @version $Revision: 1.1 $
+ */
+public class TransactionErrorHandlerBuilder extends ErrorHandlerBuilderSupport implements
Cloneable, InitializingBean {
+
+    private TransactionTemplate transactionTemplate;
+    private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+
+    public TransactionErrorHandlerBuilder() {
+    }
+
+    public TransactionTemplate getTransactionTemplate() {
+        return transactionTemplate;
+    }
+
+    public void setTransactionTemplate(TransactionTemplate transactionTemplate) {
+        this.transactionTemplate = transactionTemplate;
+    }
+
+    public RedeliveryPolicy getRedeliveryPolicy() {
+        return redeliveryPolicy;
+    }
+
+    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
+        this.redeliveryPolicy = redeliveryPolicy;
+    }
+
+    public ErrorHandlerBuilder copy() {
+        try {
+            return (ErrorHandlerBuilder) clone();
+        } catch (CloneNotSupportedException e) {
+            throw new Error("Clone should be supported: " + e, e);
+        }
+    }
+
+    public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws
Exception {
+        return new TransactionInterceptor(processor, transactionTemplate, redeliveryPolicy);
+    }
+
+    public void afterPropertiesSet() throws Exception {
+        ObjectHelper.notNull(transactionTemplate, "transactionTemplate");
+    }
+}

Propchange: activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionInterceptor.java?rev=674036&r1=674035&r2=674036&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionInterceptor.java
(original)
+++ activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionInterceptor.java
Fri Jul  4 05:55:06 2008
@@ -19,7 +19,9 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeProperty;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.Processor;
 import org.apache.camel.processor.DelegateProcessor;
+import org.apache.camel.processor.RedeliveryPolicy;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.springframework.transaction.TransactionDefinition;
@@ -40,16 +42,47 @@
         new ExchangeProperty<Boolean>("transacted", "org.apache.camel.transacted",
Boolean.class);
     private static final transient Log LOG = LogFactory.getLog(TransactionInterceptor.class);
     private final TransactionTemplate transactionTemplate;
+    private ThreadLocal<RedeliveryData> previousRollback = new ThreadLocal<RedeliveryData>()
{
+        @Override
+        protected RedeliveryData initialValue() {
+            return new RedeliveryData();
+        }
+    };
+    private RedeliveryPolicy redeliveryPolicy;
 
     public TransactionInterceptor(TransactionTemplate transactionTemplate) {
         this.transactionTemplate = transactionTemplate;
     }
 
+    public TransactionInterceptor(Processor processor, TransactionTemplate transactionTemplate)
{
+        super(processor);
+        this.transactionTemplate = transactionTemplate;
+    }
+
+    public TransactionInterceptor(Processor processor, TransactionTemplate transactionTemplate,
RedeliveryPolicy redeliveryPolicy) {
+        this(processor, transactionTemplate);
+        this.redeliveryPolicy = redeliveryPolicy;
+    }
+
+    @Override
+    public String toString() {
+        return "TransactionInterceptor:"
+            + propagationBehaviorToString(transactionTemplate.getPropagationBehavior())
+            + "[" + getProcessor() + "]";
+    }
+
     public void process(final Exchange exchange) {
         LOG.debug("Transaction begin");
 
+        final RedeliveryData redeliveryData = previousRollback.get();
+
         transactionTemplate.execute(new TransactionCallbackWithoutResult() {
             protected void doInTransactionWithoutResult(TransactionStatus status) {
+                if (redeliveryPolicy != null && redeliveryData.previousRollback)
{
+                    // lets delay
+                    redeliveryData.redeliveryDelay = redeliveryPolicy.sleep(redeliveryData.redeliveryDelay);
+                }
+
                 // wrapper exception to throw if the exchange failed
                 // IMPORTANT: Must be a runtime exception to let Spring regard it as to do
"rollback"
                 RuntimeCamelException rce = null;
@@ -92,6 +125,7 @@
 
                 // rehrow exception if the exchange failed
                 if (rce != null) {
+                    redeliveryData.previousRollback = true;
                     if (activeTx) {
                         status.setRollbackOnly();
                         LOG.debug("Transaction rollback");
@@ -101,17 +135,27 @@
             }
         });
 
+        redeliveryData.previousRollback = false;
+        redeliveryData.redeliveryDelay = 0L;
+        
         LOG.debug("Transaction commit");
     }
 
-    @Override
-    public String toString() {
-        return "TransactionInterceptor:"
-            + propagationBehaviorToString(transactionTemplate.getPropagationBehavior())
-            + "[" + getProcessor() + "]";
+
+    public RedeliveryPolicy getRedeliveryPolicy() {
+        return redeliveryPolicy;
+    }
+
+    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
+        this.redeliveryPolicy = redeliveryPolicy;
+    }
+
+    protected static class RedeliveryData {
+        boolean previousRollback;
+        long redeliveryDelay;
     }
 
-    private String propagationBehaviorToString(int propagationBehavior) {
+    protected String propagationBehaviorToString(int propagationBehavior) {
         String rc;
         switch (propagationBehavior) {
         case TransactionDefinition.PROPAGATION_MANDATORY:



Mime
View raw message