camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1170215 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/model/ main/java/org/apache/camel/model/config/ main/java/org/apache/camel/processor/ main/java/org/apache/camel/processor/resequencer/ test/java/org/apache/camel/process...
Date Tue, 13 Sep 2011 15:42:49 GMT
Author: davsclaus
Date: Tue Sep 13 15:42:48 2011
New Revision: 1170215

URL: http://svn.apache.org/viewvc?rev=1170215&view=rev
Log:
CAMEL-4401: Resequencer validate exchange before accepting. Added option ignoreInvalidExchanges.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchIgnoreInvalidExchangesTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchNotIgnoreInvalidExchangesTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamIgnoreInvalidExchangesTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamNotIgnoreInvalidExchangesTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ElementComparator.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/IntegerComparator.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestComparator.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java?rev=1170215&r1=1170214&r2=1170215&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
Tue Sep 13 15:42:48 2011
@@ -137,10 +137,14 @@ public class ResequenceDefinition extend
      * @return the builder
      */
     public ResequenceDefinition timeout(long timeout) {
-        if (batchConfig != null) {
-            batchConfig.setBatchTimeout(timeout);
-        } else {
+        if (streamConfig != null) {
             streamConfig.setTimeout(timeout);
+        } else {
+            // initialize batch mode as its default mode
+            if (batchConfig == null) {
+                batch();
+            }
+            batchConfig.setBatchTimeout(timeout);
         }
         return this;
     }
@@ -214,6 +218,24 @@ public class ResequenceDefinition extend
     }
 
     /**
+     * If an incoming {@link org.apache.camel.Exchange} is invalid, then it will be ignored.
+     *
+     * @return builder
+     */
+    public ResequenceDefinition ignoreInvalidExchanges() {
+        if (streamConfig != null) {
+            streamConfig.setIgnoreInvalidExchanges(true);
+        } else {
+            // initialize batch mode as its default mode
+            if (batchConfig == null) {
+                batch();
+            }
+            batchConfig.setIgnoreInvalidExchanges(true);
+        }
+        return this;
+    }
+
+    /**
      * Sets the comparator to use for stream resequencer
      *
      * @param comparator  the comparator
@@ -318,6 +340,9 @@ public class ResequenceDefinition extend
                 config.isAllowDuplicates(), config.isReverse());
         resequencer.setBatchSize(config.getBatchSize());
         resequencer.setBatchTimeout(config.getBatchTimeout());
+        if (config.getIgnoreInvalidExchanges() != null) {
+            resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges());
+        }
         return resequencer;
     }
 
@@ -343,6 +368,9 @@ public class ResequenceDefinition extend
         StreamResequencer resequencer = new StreamResequencer(routeContext.getCamelContext(),
processor, comparator);
         resequencer.setTimeout(config.getTimeout());
         resequencer.setCapacity(config.getCapacity());
+        if (config.getIgnoreInvalidExchanges() != null) {
+            resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges());
+        }
         return resequencer;
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java?rev=1170215&r1=1170214&r2=1170215&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java
Tue Sep 13 15:42:48 2011
@@ -36,6 +36,8 @@ public class BatchResequencerConfig exte
     private Boolean allowDuplicates;
     @XmlAttribute
     private Boolean reverse;
+    @XmlAttribute
+    private Boolean ignoreInvalidExchanges;
 
     /**
      * Creates a new {@link BatchResequencerConfig} instance using default
@@ -108,4 +110,12 @@ public class BatchResequencerConfig exte
     public void setReverse(Boolean reverse) {
         this.reverse = reverse;
     }
+
+    public Boolean getIgnoreInvalidExchanges() {
+        return ignoreInvalidExchanges;
+    }
+
+    public void setIgnoreInvalidExchanges(Boolean ignoreInvalidExchanges) {
+        this.ignoreInvalidExchanges = ignoreInvalidExchanges;
+    }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java?rev=1170215&r1=1170214&r2=1170215&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
Tue Sep 13 15:42:48 2011
@@ -37,6 +37,8 @@ public class StreamResequencerConfig ext
     private Integer capacity;
     @XmlAttribute
     private Long timeout;
+    @XmlAttribute
+    private Boolean ignoreInvalidExchanges;
     @XmlTransient
     private ExpressionResultComparator comparator;
 
@@ -106,6 +108,14 @@ public class StreamResequencerConfig ext
         this.timeout = timeout;
     }
 
+    public Boolean getIgnoreInvalidExchanges() {
+        return ignoreInvalidExchanges;
+    }
+
+    public void setIgnoreInvalidExchanges(Boolean ignoreInvalidExchanges) {
+        this.ignoreInvalidExchanges = ignoreInvalidExchanges;
+    }
+
     public ExpressionResultComparator getComparator() {
         return comparator;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=1170215&r1=1170214&r2=1170215&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Tue
Sep 13 15:42:48 2011
@@ -29,7 +29,9 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
 import org.apache.camel.Navigate;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
@@ -60,7 +62,9 @@ public class BatchProcessor extends Serv
     private int outBatchSize;
     private boolean groupExchanges;
     private boolean batchConsumer;
+    private boolean ignoreInvalidExchanges;
     private Predicate completionPredicate;
+    private Expression expression;
 
     private final CamelContext camelContext;
     private final Processor processor;
@@ -69,15 +73,17 @@ public class BatchProcessor extends Serv
 
     private final BatchSender sender;
 
-    public BatchProcessor(CamelContext camelContext, Processor processor, Collection<Exchange>
collection) {
+    public BatchProcessor(CamelContext camelContext, Processor processor, Collection<Exchange>
collection, Expression expression) {
         ObjectHelper.notNull(camelContext, "camelContext");
         ObjectHelper.notNull(processor, "processor");
         ObjectHelper.notNull(collection, "collection");
+        ObjectHelper.notNull(expression, "expression");
 
         // wrap processor in UnitOfWork so what we send out of the batch runs in a UoW
         this.camelContext = camelContext;
         this.processor = new UnitOfWorkProcessor(processor);
         this.collection = collection;
+        this.expression = expression;
         this.sender = new BatchSender();
     }
 
@@ -159,6 +165,14 @@ public class BatchProcessor extends Serv
         this.batchConsumer = batchConsumer;
     }
 
+    public boolean isIgnoreInvalidExchanges() {
+        return ignoreInvalidExchanges;
+    }
+
+    public void setIgnoreInvalidExchanges(boolean ignoreInvalidExchanges) {
+        this.ignoreInvalidExchanges = ignoreInvalidExchanges;
+    }
+
     public Predicate getCompletionPredicate() {
         return completionPredicate;
     }
@@ -242,10 +256,37 @@ public class BatchProcessor extends Serv
             }
         }
 
+        // validate that the exchange can be used
+        if (!isValid(exchange)) {
+            if (isIgnoreInvalidExchanges()) {
+                LOG.debug("Invalid Exchange. This Exchange will be ignored: {}", exchange);
+                return;
+            } else {
+                throw new CamelExchangeException("Exchange is not valid to be used by the
BatchProcessor", exchange);
+            }
+        }
+
+        // exchange is valid so enqueue the exchange
         sender.enqueueExchange(exchange);
     }
 
     /**
+     * Is the given exchange valid to be used.
+     *
+     * @param exchange the given exchange
+     * @return <tt>true</tt> if valid, <tt>false</tt> otherwise
+     */
+    private boolean isValid(Exchange exchange) {
+        Object result = null;
+        try {
+            result = expression.evaluate(exchange, Object.class);
+        } catch (Exception e) {
+            // ignore
+        }
+        return result != null;
+    }
+
+    /**
      * Sender thread for queued-up exchanges.
      */
     private class BatchSender extends Thread {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java?rev=1170215&r1=1170214&r2=1170215&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java Tue Sep
13 15:42:48 2011
@@ -40,16 +40,16 @@ public class Resequencer extends BatchPr
     // TODO: Rework to avoid using BatchProcessor
 
     public Resequencer(CamelContext camelContext, Processor processor, Expression expression)
{
-        this(camelContext, processor, createSet(expression, false, false));
+        this(camelContext, processor, createSet(expression, false, false), expression);
     }
 
     public Resequencer(CamelContext camelContext, Processor processor, Expression expression,
                        boolean allowDuplicates, boolean reverse) {
-        this(camelContext, processor, createSet(expression, allowDuplicates, reverse));
+        this(camelContext, processor, createSet(expression, allowDuplicates, reverse), expression);
     }
 
-    public Resequencer(CamelContext camelContext, Processor processor, Set<Exchange>
collection) {
-        super(camelContext, processor, collection);
+    public Resequencer(CamelContext camelContext, Processor processor, Set<Exchange>
collection, Expression expression) {
+        super(camelContext, processor, collection, expression);
     }
 
     @Override
@@ -68,13 +68,6 @@ public class Resequencer extends BatchPr
         return createSet(new ExpressionComparator(expression), allowDuplicates, reverse);
     }
 
-    protected static Set<Exchange> createSet(List<Expression> expressions, boolean
allowDuplicates, boolean reverse) {
-        if (expressions.size() == 1) {
-            return createSet(expressions.get(0), allowDuplicates, reverse);
-        }
-        return createSet(new ExpressionListComparator(expressions), allowDuplicates, reverse);
-    }
-
     protected static Set<Exchange> createSet(final Comparator<? super Exchange>
comparator, boolean allowDuplicates, boolean reverse) {
         Comparator<? super Exchange> answer = comparator;
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=1170215&r1=1170214&r2=1170215&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
Tue Sep 13 15:42:48 2011
@@ -24,6 +24,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
@@ -35,6 +36,8 @@ import org.apache.camel.spi.ExceptionHan
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A resequencer that re-orders a (continuous) stream of {@link Exchange}s. The
@@ -61,6 +64,7 @@ import org.apache.camel.util.ServiceHelp
 public class StreamResequencer extends ServiceSupport implements SequenceSender<Exchange>,
Processor, Navigate<Processor>, Traceable {
 
     private static final long DELIVERY_ATTEMPT_INTERVAL = 1000L;
+    private static final Logger LOG = LoggerFactory.getLogger(StreamResequencer.class);
 
     private final CamelContext camelContext;
     private final ExceptionHandler exceptionHandler;
@@ -68,6 +72,7 @@ public class StreamResequencer extends S
     private final Processor processor;
     private Delivery delivery;
     private int capacity;
+    private boolean ignoreInvalidExchanges;
     
     /**
      * Creates a new {@link StreamResequencer} instance.
@@ -131,6 +136,20 @@ public class StreamResequencer extends S
         engine.setTimeout(timeout);
     }
 
+    public boolean isIgnoreInvalidExchanges() {
+        return ignoreInvalidExchanges;
+    }
+
+    /**
+     * Sets whether to ignore invalid exchanges which cannot be used by this stream resequencer.
+     * <p/>
+     * Default is <tt>false</tt>, by which an {@link CamelExchangeException}
is thrown if the {@link Exchange}
+     * is invalid.
+     */
+    public void setIgnoreInvalidExchanges(boolean ignoreInvalidExchanges) {
+        this.ignoreInvalidExchanges = ignoreInvalidExchanges;
+    }
+
     @Override
     public String toString() {
         return "StreamResequencer[to: " + processor + "]";
@@ -169,8 +188,18 @@ public class StreamResequencer extends S
         while (engine.size() >= capacity) {
             Thread.sleep(getTimeout());
         }
-        engine.insert(exchange);
-        delivery.request();
+
+        try {
+            engine.insert(exchange);
+            delivery.request();
+        } catch (Exception e) {
+            if (isIgnoreInvalidExchanges()) {
+                LOG.debug("Invalid Exchange. This Exchange will be ignored: {}", exchange);
+                return;
+            } else {
+                throw new CamelExchangeException("Error processing Exchange in StreamResequencer",
exchange, e);
+            }
+        }
     }
 
     public boolean hasNext() {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java?rev=1170215&r1=1170214&r2=1170215&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java
Tue Sep 13 15:42:48 2011
@@ -30,29 +30,49 @@ public class DefaultExchangeComparator i
 
     private Expression expression;
 
+    @Override
     public void setExpression(Expression expression) {
         this.expression = expression;
     }
 
+    @Override
     public boolean predecessor(Exchange o1, Exchange o2) {
         long n1 = getSequenceNumber(o1);
         long n2 = getSequenceNumber(o2);
         return n1 == (n2 - 1L);
     }
 
+    @Override
     public boolean successor(Exchange o1, Exchange o2) {
         long n1 = getSequenceNumber(o1);
         long n2 = getSequenceNumber(o2);
         return n2 == (n1 - 1L);
     }
 
+    @Override
     public int compare(Exchange o1, Exchange o2) {
         Long n1 = getSequenceNumber(o1);
         Long n2 = getSequenceNumber(o2);
         return n1.compareTo(n2);
     }
 
-    private long getSequenceNumber(Exchange exchange) {
+    private Long getSequenceNumber(Exchange exchange) {
         return expression.evaluate(exchange, Long.class);
     }
+
+    @Override
+    public boolean isValid(Exchange exchange) {
+        Long num = null;
+        try {
+            num = expression.evaluate(exchange, Long.class);
+        } catch (Exception e) {
+            // ignore
+        }
+        return num != null;
+    }
+
+    @Override
+    public String toString() {
+        return "Comparator[" + expression + "]";
+    }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ElementComparator.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ElementComparator.java?rev=1170215&r1=1170214&r2=1170215&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ElementComparator.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ElementComparator.java
Tue Sep 13 15:42:48 2011
@@ -28,7 +28,7 @@ class ElementComparator<E> implements Se
     /**
      * A sequence element comparator this comparator delegates to.
      */
-    private SequenceElementComparator<E> comparator;
+    private final SequenceElementComparator<E> comparator;
     
     /**
      * Creates a new element comparator instance.
@@ -39,16 +39,28 @@ class ElementComparator<E> implements Se
         this.comparator = comparator;
     }
     
+    @Override
     public boolean predecessor(Element<E> o1, Element<E> o2) {
         return comparator.predecessor(o1.getObject(), o2.getObject());
     }
 
+    @Override
     public boolean successor(Element<E> o1, Element<E> o2) {
         return comparator.successor(o1.getObject(), o2.getObject());
     }
 
+    @Override
     public int compare(Element<E> o1, Element<E> o2) {
         return comparator.compare(o1.getObject(), o2.getObject());
     }
 
+    @Override
+    public boolean isValid(Element<E> o1) {
+        return comparator.isValid(o1.getObject());
+    }
+
+    @Override
+    public String toString() {
+        return comparator.toString();
+    }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java?rev=1170215&r1=1170214&r2=1170215&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
Tue Sep 13 15:42:48 2011
@@ -18,6 +18,7 @@ package org.apache.camel.processor.reseq
 
 import java.util.Timer;
 
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.util.concurrent.ThreadHelper;
 
 /**
@@ -182,10 +183,17 @@ public class ResequencerEngine<E> {
      * scheduled for timing out. After being timed out it is ready for delivery.
      *
      * @param o an element.
+     * @throws IllegalArgumentException if the element cannot be used with this resequencer
engine
      */
     public synchronized void insert(E o) {
         // wrap object into internal element
         Element<E> element = new Element<E>(o);
+
+        // validate the exchange has no problem
+        if (!sequence.comparator().isValid(element)) {
+            throw new IllegalArgumentException("Element cannot be used in comparator: " +
sequence.comparator());
+        }
+
         // add element to sequence in proper order
         sequence.add(element);
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java?rev=1170215&r1=1170214&r2=1170215&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java
Tue Sep 13 15:42:48 2011
@@ -44,5 +44,13 @@ public interface SequenceElementComparat
      * @return true if its an immediate successor
      */
     boolean successor(E o1, E o2);
+
+    /**
+     * Returns <tt>true</tt> if the <code>o1</code> can be used in
this comparator.
+     *
+     * @param o1 a sequence element
+     * @return true if its usable for this comparator
+     */
+    boolean isValid(E o1);
     
 }

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchIgnoreInvalidExchangesTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchIgnoreInvalidExchangesTest.java?rev=1170215&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchIgnoreInvalidExchangesTest.java
(added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchIgnoreInvalidExchangesTest.java
Tue Sep 13 15:42:48 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class ResequenceBatchIgnoreInvalidExchangesTest extends ResequenceStreamIgnoreInvalidExchangesTest
{
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                from("direct:start")
+                    .resequence(header("seqno")).batch().timeout(1000)
+                        // ignore invalid exchanges (they are discarded)
+                        .ignoreInvalidExchanges()
+                    .to("mock:result");
+                // END SNIPPET: e1
+            }
+        };
+    }
+}

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchNotIgnoreInvalidExchangesTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchNotIgnoreInvalidExchangesTest.java?rev=1170215&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchNotIgnoreInvalidExchangesTest.java
(added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchNotIgnoreInvalidExchangesTest.java
Tue Sep 13 15:42:48 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class ResequenceBatchNotIgnoreInvalidExchangesTest extends ResequenceStreamNotIgnoreInvalidExchangesTest
{
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .resequence(header("seqno")).batch().timeout(1000)
+                    .to("mock:result");
+            }
+        };
+    }
+}

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamIgnoreInvalidExchangesTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamIgnoreInvalidExchangesTest.java?rev=1170215&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamIgnoreInvalidExchangesTest.java
(added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamIgnoreInvalidExchangesTest.java
Tue Sep 13 15:42:48 2011
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class ResequenceStreamIgnoreInvalidExchangesTest extends ContextTestSupport {
+
+    public void testBadFirstMessage() throws Exception {
+        // bad messages is ignored
+        getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D");
+
+        template.sendBody("direct:start", "A");
+        template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
+        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
+        template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testBadSecondMessage() throws Exception {
+        // bad messages is ignored
+        getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D");
+
+        template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
+        template.sendBody("direct:start", "A");
+        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
+        template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testBadThirdMessage() throws Exception {
+        // bad messages is ignored
+        getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D");
+
+        template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
+        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
+        template.sendBody("direct:start", "A");
+        template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testBadForthMessage() throws Exception {
+        // bad messages is ignored
+        getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D");
+
+        template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
+        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
+        template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
+        template.sendBody("direct:start", "A");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                from("direct:start")
+                    .resequence(header("seqno")).stream().timeout(1000)
+                        // ignore invalid exchanges (they are discarded)
+                        .ignoreInvalidExchanges()
+                    .to("mock:result");
+                // END SNIPPET: e1
+            }
+        };
+    }
+}

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamNotIgnoreInvalidExchangesTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamNotIgnoreInvalidExchangesTest.java?rev=1170215&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamNotIgnoreInvalidExchangesTest.java
(added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamNotIgnoreInvalidExchangesTest.java
Tue Sep 13 15:42:48 2011
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class ResequenceStreamNotIgnoreInvalidExchangesTest extends ContextTestSupport {
+
+    public void testBadFirstMessage() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D");
+
+        try {
+            template.sendBody("direct:start", "A");
+            fail("Should fail");
+        } catch (CamelExecutionException e) {
+            // expected
+        }
+        template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
+        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
+        template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testBadSecondMessage() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D");
+
+        template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
+        try {
+            template.sendBody("direct:start", "A");
+            fail("Should fail");
+        } catch (CamelExecutionException e) {
+            // expected
+        }
+        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
+        template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testBadThirdMessage() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D");
+
+        template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
+        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
+        try {
+            template.sendBody("direct:start", "A");
+            fail("Should fail");
+        } catch (CamelExecutionException e) {
+            // expected
+        }
+        template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testBadForthMessage() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D");
+
+        template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
+        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
+        template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
+        try {
+            template.sendBody("direct:start", "A");
+            fail("Should fail");
+        } catch (CamelExecutionException e) {
+            // expected
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .resequence(header("seqno")).stream().timeout(1000)
+                    .to("mock:result");
+            }
+        };
+    }
+}

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/IntegerComparator.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/IntegerComparator.java?rev=1170215&r1=1170214&r2=1170215&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/IntegerComparator.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/IntegerComparator.java
Tue Sep 13 15:42:48 2011
@@ -18,16 +18,24 @@ package org.apache.camel.processor.reseq
 
 public class IntegerComparator implements SequenceElementComparator<Integer> {
 
+    @Override
     public boolean predecessor(Integer o1, Integer o2) {
         return o1.intValue() == (o2.intValue() - 1);
     }
 
+    @Override
     public boolean successor(Integer o1, Integer o2) {
         return o2.intValue() == (o1.intValue() - 1);
     }
 
+    @Override
     public int compare(Integer o1, Integer o2) {
         return o1.compareTo(o2);
     }
 
+    @Override
+    public boolean isValid(Integer o1) {
+        return o1 != null;
+    }
+
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestComparator.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestComparator.java?rev=1170215&r1=1170214&r2=1170215&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestComparator.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestComparator.java
Tue Sep 13 15:42:48 2011
@@ -18,16 +18,24 @@ package org.apache.camel.processor.reseq
 
 public class TestComparator implements SequenceElementComparator<TestObject> {
 
+    @Override
     public boolean predecessor(TestObject o1, TestObject o2) {
         return (o2.getValue() - 1) == o1.getValue();
     }
 
+    @Override
     public boolean successor(TestObject o1, TestObject o2) {
         return (o1.getValue() - 1) == o2.getValue();
     }
 
+    @Override
     public int compare(TestObject o1, TestObject o2) {
         return Integer.valueOf(o1.getValue()).compareTo(o2.getValue());
     }
 
+    @Override
+    public boolean isValid(TestObject o1) {
+        return o1 != null;
+    }
+
 }



Mime
View raw message