camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1038128 - in /camel/trunk/camel-core/src/main/java/org/apache/camel: impl/InterceptSendToEndpoint.java processor/Pipeline.java processor/PipelineHelper.java processor/RoutingSlip.java
Date Tue, 23 Nov 2010 14:21:36 GMT
Author: davsclaus
Date: Tue Nov 23 14:21:35 2010
New Revision: 1038128

URL: http://svn.apache.org/viewvc?rev=1038128&view=rev
Log:
CAMEL-3352: Reusing logic for pipes and filters in the various EIP patterns in Camel.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PipelineHelper.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java?rev=1038128&r1=1038127&r2=1038128&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java
Tue Nov 23 14:21:35 2010
@@ -30,6 +30,8 @@ import org.apache.camel.util.ServiceHelp
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import static org.apache.camel.processor.PipelineHelper.continueProcessing;
+
 /**
  * This is an endpoint when sending to it, is intercepted and is routed in a detour
  *
@@ -121,29 +123,9 @@ public class InterceptSendToEndpoint imp
                     exchange.setException(e);
                 }
 
+                // Decide whether to continue or not; similar logic to the Pipeline
                 // check for error if so we should break out
-                boolean exceptionHandled = hasExceptionBeenHandledByErrorHandler(exchange);
-                if (exchange.isFailed() || exchange.isRollbackOnly() || exceptionHandled)
{
-                    // The Exchange.ERRORHANDLED_HANDLED property is only set if satisfactory
handling was done
-                    // by the error handler. It's still an exception, the exchange still
failed.
-                    if (LOG.isDebugEnabled()) {
-                        StringBuilder sb = new StringBuilder();
-                        sb.append("Message exchange has failed so skip sending to original
intended destination: ").append(getEndpointUri());
-                        sb.append(" for Exchange: ").append(exchange);
-                        if (exchange.isRollbackOnly()) {
-                            sb.append(" Marked as rollback only.");
-                        }
-                        if (exchange.getException() != null) {
-                            sb.append(" Exception: ").append(exchange.getException());
-                        }
-                        if (exchange.hasOut() && exchange.getOut().isFault()) {
-                            sb.append(" Fault: ").append(exchange.getOut());
-                        }
-                        if (exceptionHandled) {
-                            sb.append(" Handled by the error handler.");
-                        }
-                        LOG.debug(sb.toString());
-                    }
+                if (!continueProcessing(exchange, "skip sending to original intended destination:
" + getEndpointUri(), LOG)) {
                     return;
                 }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=1038128&r1=1038127&r2=1038128&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Tue Nov
23 14:21:35 2010
@@ -32,6 +32,8 @@ import org.apache.camel.util.ExchangeHel
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import static org.apache.camel.processor.PipelineHelper.continueProcessing;
+
 /**
  * Creates a Pipeline pattern where the output of the previous step is sent as
  * input to the next step, reusing the same message exchanges
@@ -92,27 +94,7 @@ public class Pipeline extends MulticastP
             }
 
             // check for error if so we should break out
-            boolean exceptionHandled = hasExceptionBeenHandledByErrorHandler(nextExchange);
-            if (nextExchange.isFailed() || nextExchange.isRollbackOnly() || exceptionHandled)
{
-                // The Exchange.ERRORHANDLED_HANDLED property is only set if satisfactory
handling was done
-                // by the error handler. It's still an exception, the exchange still failed.
-                if (LOG.isDebugEnabled()) {
-                    StringBuilder sb = new StringBuilder();
-                    sb.append("Message exchange has failed so breaking out of pipeline: ").append(nextExchange);
-                    if (nextExchange.isRollbackOnly()) {
-                        sb.append(" Marked as rollback only.");
-                    }
-                    if (nextExchange.getException() != null) {
-                        sb.append(" Exception: ").append(nextExchange.getException());
-                    }
-                    if (nextExchange.hasOut() && nextExchange.getOut().isFault())
{
-                        sb.append(" Fault: ").append(nextExchange.getOut());
-                    }
-                    if (exceptionHandled) {
-                        sb.append(" Handled by the error handler.");
-                    }
-                    LOG.debug(sb.toString());
-                }
+            if (!continueProcessing(nextExchange, "so breaking out of pipeline", LOG)) {
                 break;
             }
         }
@@ -153,27 +135,7 @@ public class Pipeline extends MulticastP
                     AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next());
 
                     // check for error if so we should break out
-                    boolean exceptionHandled = hasExceptionBeenHandledByErrorHandler(nextExchange);
-                    if (nextExchange.isFailed() || nextExchange.isRollbackOnly() || exceptionHandled)
{
-                        // The Exchange.ERRORHANDLED_HANDLED property is only set if satisfactory
handling was done
-                        // by the error handler. It's still an exception, the exchange still
failed.
-                        if (LOG.isDebugEnabled()) {
-                            StringBuilder sb = new StringBuilder();
-                            sb.append("Message exchange has failed so breaking out of pipeline:
").append(nextExchange);
-                            if (nextExchange.isRollbackOnly()) {
-                                sb.append(" Marked as rollback only.");
-                            }
-                            if (nextExchange.getException() != null) {
-                                sb.append(" Exception: ").append(nextExchange.getException());
-                            }
-                            if (nextExchange.hasOut() && nextExchange.getOut().isFault())
{
-                                sb.append(" Fault: ").append(nextExchange.getOut());
-                            }
-                            if (exceptionHandled) {
-                                sb.append(" Handled by the error handler.");
-                            }
-                            LOG.debug(sb.toString());
-                        }
+                    if (!continueProcessing(nextExchange, "so breaking out of pipeline",
LOG)) {
                         break;
                     }
 
@@ -198,10 +160,6 @@ public class Pipeline extends MulticastP
         return sync;
     }
 
-    private static boolean hasExceptionBeenHandledByErrorHandler(Exchange nextExchange) {
-        return Boolean.TRUE.equals(nextExchange.getProperty(Exchange.ERRORHANDLER_HANDLED));
-    }
-
     /**
      * Strategy method to create the next exchange from the previous exchange.
      * <p/>

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PipelineHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PipelineHelper.java?rev=1038128&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PipelineHelper.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PipelineHelper.java Tue
Nov 23 14:21:35 2010
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.commons.logging.Log;
+
+/**
+ * Helper for processing {@link org.apache.camel.Exchange} in a
+ * <a href="http://camel.apache.org/pipes-and-filters.html">pipeline</a>.
+ *
+ * @version $Revision$
+ */
+public final class PipelineHelper {
+
+    private PipelineHelper() {
+    }
+
+    private static boolean hasExceptionBeenHandledByErrorHandler(Exchange nextExchange) {
+        return Boolean.TRUE.equals(nextExchange.getProperty(Exchange.ERRORHANDLER_HANDLED));
+    }
+
+    /**
+     * Should we continue processing the next exchange?
+     *
+     * @param nextExchange the next exchange
+     * @param message a message to use when logging that we should not continue processing
+     * @param log a logger
+     * @return <tt>true</tt> to continue processing, <tt>false</tt>
to break out, for example if an exception occurred.
+     */
+    public static boolean continueProcessing(Exchange nextExchange, String message, Log log)
{
+        // check for error if so we should break out
+        boolean exceptionHandled = hasExceptionBeenHandledByErrorHandler(nextExchange);
+        if (nextExchange.isFailed() || nextExchange.isRollbackOnly() || exceptionHandled)
{
+            // The Exchange.ERRORHANDLED_HANDLED property is only set if satisfactory handling
was done
+            // by the error handler. It's still an exception, the exchange still failed.
+            if (log.isDebugEnabled()) {
+                StringBuilder sb = new StringBuilder();
+                sb.append("Message exchange has failed " + message + " for exchange: ").append(nextExchange);
+                if (nextExchange.isRollbackOnly()) {
+                    sb.append(" Marked as rollback only.");
+                }
+                if (nextExchange.getException() != null) {
+                    sb.append(" Exception: ").append(nextExchange.getException());
+                }
+                if (nextExchange.hasOut() && nextExchange.getOut().isFault()) {
+                    sb.append(" Fault: ").append(nextExchange.getOut());
+                }
+                if (exceptionHandled) {
+                    sb.append(" Handled by the error handler.");
+                }
+                log.debug(sb.toString());
+            }
+
+            return false;
+        }
+
+        return true;
+    }
+
+}

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java?rev=1038128&r1=1038127&r2=1038128&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java Tue Nov
23 14:21:35 2010
@@ -40,6 +40,7 @@ import org.apache.camel.util.ServiceHelp
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import static org.apache.camel.processor.PipelineHelper.continueProcessing;
 import static org.apache.camel.util.ObjectHelper.notNull;
 
 /**
@@ -212,27 +213,8 @@ public class RoutingSlip extends Service
             }
 
             // Decide whether to continue with the recipients or not; similar logic to the
Pipeline
-            boolean exceptionHandled = hasExceptionBeenHandledByErrorHandler(current);
-            if (current.isFailed() || current.isRollbackOnly() || exceptionHandled) {
-                // The Exchange.ERRORHANDLED_HANDLED property is only set if satisfactory
handling was done
-                // by the error handler. It's still an exception, the exchange still failed.
-                if (log.isDebugEnabled()) {
-                    StringBuilder sb = new StringBuilder();
-                    sb.append("Message exchange has failed so breaking out of the routing
slip: ").append(current);
-                    if (current.isRollbackOnly()) {
-                        sb.append(" Marked as rollback only.");
-                    }
-                    if (current.getException() != null) {
-                        sb.append(" Exception: ").append(current.getException());
-                    }
-                    if (current.hasOut() && current.getOut().isFault()) {
-                        sb.append(" Fault: ").append(current.getOut());
-                    }
-                    if (exceptionHandled) {
-                        sb.append(" Handled by the error handler.");
-                    }
-                    log.debug(sb.toString());
-                }
+            // check for error if so we should break out
+            if (!continueProcessing(current, "so breaking out of the routing slip", log))
{
                 break;
             }
         }
@@ -315,27 +297,8 @@ public class RoutingSlip extends Service
                             }
 
                             // Decide whether to continue with the recipients or not; similar
logic to the Pipeline
-                            boolean exceptionHandled = hasExceptionBeenHandledByErrorHandler(current);
-                            if (current.isFailed() || current.isRollbackOnly() || exceptionHandled)
{
-                                // The Exchange.ERRORHANDLED_HANDLED property is only set
if satisfactory handling was done
-                                // by the error handler. It's still an exception, the exchange
still failed.
-                                if (log.isDebugEnabled()) {
-                                    StringBuilder sb = new StringBuilder();
-                                    sb.append("Message exchange has failed so breaking out
of the routing slip: ").append(current);
-                                    if (current.isRollbackOnly()) {
-                                        sb.append(" Marked as rollback only.");
-                                    }
-                                    if (current.getException() != null) {
-                                        sb.append(" Exception: ").append(current.getException());
-                                    }
-                                    if (current.hasOut() && current.getOut().isFault())
{
-                                        sb.append(" Fault: ").append(current.getOut());
-                                    }
-                                    if (exceptionHandled) {
-                                        sb.append(" Handled by the error handler.");
-                                    }
-                                    log.debug(sb.toString());
-                                }
+                            // check for error if so we should break out
+                            if (!continueProcessing(current, "so breaking out of the routing
slip", log)) {
                                 break;
                             }
 



Mime
View raw message