Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A963587DB for ; Tue, 13 Sep 2011 15:43:19 +0000 (UTC) Received: (qmail 30290 invoked by uid 500); 13 Sep 2011 15:43:19 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 30152 invoked by uid 500); 13 Sep 2011 15:43:18 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 30145 invoked by uid 99); 13 Sep 2011 15:43:18 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Sep 2011 15:43:18 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Sep 2011 15:43:12 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 9B5FE2388A02 for ; Tue, 13 Sep 2011 15:42:50 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1170215 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/model/ main/java/org/apache/camel/model/config/ main/java/org/apache/camel/processor/ main/java/org/apache/camel/processor/resequencer/ test/java/org/apache/camel/process... Date: Tue, 13 Sep 2011 15:42:49 -0000 To: commits@camel.apache.org From: davsclaus@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110913154250.9B5FE2388A02@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: davsclaus Date: Tue Sep 13 15:42:48 2011 New Revision: 1170215 URL: http://svn.apache.org/viewvc?rev=1170215&view=rev Log: CAMEL-4401: Resequencer validate exchange before accepting. Added option ignoreInvalidExchanges. Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchIgnoreInvalidExchangesTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchNotIgnoreInvalidExchangesTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamIgnoreInvalidExchangesTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamNotIgnoreInvalidExchangesTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ElementComparator.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/IntegerComparator.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestComparator.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java?rev=1170215&r1=1170214&r2=1170215&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java Tue Sep 13 15:42:48 2011 @@ -137,10 +137,14 @@ public class ResequenceDefinition extend * @return the builder */ public ResequenceDefinition timeout(long timeout) { - if (batchConfig != null) { - batchConfig.setBatchTimeout(timeout); - } else { + if (streamConfig != null) { streamConfig.setTimeout(timeout); + } else { + // initialize batch mode as its default mode + if (batchConfig == null) { + batch(); + } + batchConfig.setBatchTimeout(timeout); } return this; } @@ -214,6 +218,24 @@ public class ResequenceDefinition extend } /** + * If an incoming {@link org.apache.camel.Exchange} is invalid, then it will be ignored. + * + * @return builder + */ + public ResequenceDefinition ignoreInvalidExchanges() { + if (streamConfig != null) { + streamConfig.setIgnoreInvalidExchanges(true); + } else { + // initialize batch mode as its default mode + if (batchConfig == null) { + batch(); + } + batchConfig.setIgnoreInvalidExchanges(true); + } + return this; + } + + /** * Sets the comparator to use for stream resequencer * * @param comparator the comparator @@ -318,6 +340,9 @@ public class ResequenceDefinition extend config.isAllowDuplicates(), config.isReverse()); resequencer.setBatchSize(config.getBatchSize()); resequencer.setBatchTimeout(config.getBatchTimeout()); + if (config.getIgnoreInvalidExchanges() != null) { + resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges()); + } return resequencer; } @@ -343,6 +368,9 @@ public class ResequenceDefinition extend StreamResequencer resequencer = new StreamResequencer(routeContext.getCamelContext(), processor, comparator); resequencer.setTimeout(config.getTimeout()); resequencer.setCapacity(config.getCapacity()); + if (config.getIgnoreInvalidExchanges() != null) { + resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges()); + } return resequencer; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java?rev=1170215&r1=1170214&r2=1170215&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java Tue Sep 13 15:42:48 2011 @@ -36,6 +36,8 @@ public class BatchResequencerConfig exte private Boolean allowDuplicates; @XmlAttribute private Boolean reverse; + @XmlAttribute + private Boolean ignoreInvalidExchanges; /** * Creates a new {@link BatchResequencerConfig} instance using default @@ -108,4 +110,12 @@ public class BatchResequencerConfig exte public void setReverse(Boolean reverse) { this.reverse = reverse; } + + public Boolean getIgnoreInvalidExchanges() { + return ignoreInvalidExchanges; + } + + public void setIgnoreInvalidExchanges(Boolean ignoreInvalidExchanges) { + this.ignoreInvalidExchanges = ignoreInvalidExchanges; + } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java?rev=1170215&r1=1170214&r2=1170215&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java Tue Sep 13 15:42:48 2011 @@ -37,6 +37,8 @@ public class StreamResequencerConfig ext private Integer capacity; @XmlAttribute private Long timeout; + @XmlAttribute + private Boolean ignoreInvalidExchanges; @XmlTransient private ExpressionResultComparator comparator; @@ -106,6 +108,14 @@ public class StreamResequencerConfig ext this.timeout = timeout; } + public Boolean getIgnoreInvalidExchanges() { + return ignoreInvalidExchanges; + } + + public void setIgnoreInvalidExchanges(Boolean ignoreInvalidExchanges) { + this.ignoreInvalidExchanges = ignoreInvalidExchanges; + } + public ExpressionResultComparator getComparator() { return comparator; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=1170215&r1=1170214&r2=1170215&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Tue Sep 13 15:42:48 2011 @@ -29,7 +29,9 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.CamelContext; +import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; +import org.apache.camel.Expression; import org.apache.camel.Navigate; import org.apache.camel.Predicate; import org.apache.camel.Processor; @@ -60,7 +62,9 @@ public class BatchProcessor extends Serv private int outBatchSize; private boolean groupExchanges; private boolean batchConsumer; + private boolean ignoreInvalidExchanges; private Predicate completionPredicate; + private Expression expression; private final CamelContext camelContext; private final Processor processor; @@ -69,15 +73,17 @@ public class BatchProcessor extends Serv private final BatchSender sender; - public BatchProcessor(CamelContext camelContext, Processor processor, Collection collection) { + public BatchProcessor(CamelContext camelContext, Processor processor, Collection collection, Expression expression) { ObjectHelper.notNull(camelContext, "camelContext"); ObjectHelper.notNull(processor, "processor"); ObjectHelper.notNull(collection, "collection"); + ObjectHelper.notNull(expression, "expression"); // wrap processor in UnitOfWork so what we send out of the batch runs in a UoW this.camelContext = camelContext; this.processor = new UnitOfWorkProcessor(processor); this.collection = collection; + this.expression = expression; this.sender = new BatchSender(); } @@ -159,6 +165,14 @@ public class BatchProcessor extends Serv this.batchConsumer = batchConsumer; } + public boolean isIgnoreInvalidExchanges() { + return ignoreInvalidExchanges; + } + + public void setIgnoreInvalidExchanges(boolean ignoreInvalidExchanges) { + this.ignoreInvalidExchanges = ignoreInvalidExchanges; + } + public Predicate getCompletionPredicate() { return completionPredicate; } @@ -242,10 +256,37 @@ public class BatchProcessor extends Serv } } + // validate that the exchange can be used + if (!isValid(exchange)) { + if (isIgnoreInvalidExchanges()) { + LOG.debug("Invalid Exchange. This Exchange will be ignored: {}", exchange); + return; + } else { + throw new CamelExchangeException("Exchange is not valid to be used by the BatchProcessor", exchange); + } + } + + // exchange is valid so enqueue the exchange sender.enqueueExchange(exchange); } /** + * Is the given exchange valid to be used. + * + * @param exchange the given exchange + * @return true if valid, false otherwise + */ + private boolean isValid(Exchange exchange) { + Object result = null; + try { + result = expression.evaluate(exchange, Object.class); + } catch (Exception e) { + // ignore + } + return result != null; + } + + /** * Sender thread for queued-up exchanges. */ private class BatchSender extends Thread { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java?rev=1170215&r1=1170214&r2=1170215&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java Tue Sep 13 15:42:48 2011 @@ -40,16 +40,16 @@ public class Resequencer extends BatchPr // TODO: Rework to avoid using BatchProcessor public Resequencer(CamelContext camelContext, Processor processor, Expression expression) { - this(camelContext, processor, createSet(expression, false, false)); + this(camelContext, processor, createSet(expression, false, false), expression); } public Resequencer(CamelContext camelContext, Processor processor, Expression expression, boolean allowDuplicates, boolean reverse) { - this(camelContext, processor, createSet(expression, allowDuplicates, reverse)); + this(camelContext, processor, createSet(expression, allowDuplicates, reverse), expression); } - public Resequencer(CamelContext camelContext, Processor processor, Set collection) { - super(camelContext, processor, collection); + public Resequencer(CamelContext camelContext, Processor processor, Set collection, Expression expression) { + super(camelContext, processor, collection, expression); } @Override @@ -68,13 +68,6 @@ public class Resequencer extends BatchPr return createSet(new ExpressionComparator(expression), allowDuplicates, reverse); } - protected static Set createSet(List expressions, boolean allowDuplicates, boolean reverse) { - if (expressions.size() == 1) { - return createSet(expressions.get(0), allowDuplicates, reverse); - } - return createSet(new ExpressionListComparator(expressions), allowDuplicates, reverse); - } - protected static Set createSet(final Comparator comparator, boolean allowDuplicates, boolean reverse) { Comparator answer = comparator; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=1170215&r1=1170214&r2=1170215&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java Tue Sep 13 15:42:48 2011 @@ -24,6 +24,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.CamelContext; +import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.Navigate; import org.apache.camel.Processor; @@ -35,6 +36,8 @@ import org.apache.camel.spi.ExceptionHan import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ServiceHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A resequencer that re-orders a (continuous) stream of {@link Exchange}s. The @@ -61,6 +64,7 @@ import org.apache.camel.util.ServiceHelp public class StreamResequencer extends ServiceSupport implements SequenceSender, Processor, Navigate, Traceable { private static final long DELIVERY_ATTEMPT_INTERVAL = 1000L; + private static final Logger LOG = LoggerFactory.getLogger(StreamResequencer.class); private final CamelContext camelContext; private final ExceptionHandler exceptionHandler; @@ -68,6 +72,7 @@ public class StreamResequencer extends S private final Processor processor; private Delivery delivery; private int capacity; + private boolean ignoreInvalidExchanges; /** * Creates a new {@link StreamResequencer} instance. @@ -131,6 +136,20 @@ public class StreamResequencer extends S engine.setTimeout(timeout); } + public boolean isIgnoreInvalidExchanges() { + return ignoreInvalidExchanges; + } + + /** + * Sets whether to ignore invalid exchanges which cannot be used by this stream resequencer. + *

+ * Default is false, by which an {@link CamelExchangeException} is thrown if the {@link Exchange} + * is invalid. + */ + public void setIgnoreInvalidExchanges(boolean ignoreInvalidExchanges) { + this.ignoreInvalidExchanges = ignoreInvalidExchanges; + } + @Override public String toString() { return "StreamResequencer[to: " + processor + "]"; @@ -169,8 +188,18 @@ public class StreamResequencer extends S while (engine.size() >= capacity) { Thread.sleep(getTimeout()); } - engine.insert(exchange); - delivery.request(); + + try { + engine.insert(exchange); + delivery.request(); + } catch (Exception e) { + if (isIgnoreInvalidExchanges()) { + LOG.debug("Invalid Exchange. This Exchange will be ignored: {}", exchange); + return; + } else { + throw new CamelExchangeException("Error processing Exchange in StreamResequencer", exchange, e); + } + } } public boolean hasNext() { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java?rev=1170215&r1=1170214&r2=1170215&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java Tue Sep 13 15:42:48 2011 @@ -30,29 +30,49 @@ public class DefaultExchangeComparator i private Expression expression; + @Override public void setExpression(Expression expression) { this.expression = expression; } + @Override public boolean predecessor(Exchange o1, Exchange o2) { long n1 = getSequenceNumber(o1); long n2 = getSequenceNumber(o2); return n1 == (n2 - 1L); } + @Override public boolean successor(Exchange o1, Exchange o2) { long n1 = getSequenceNumber(o1); long n2 = getSequenceNumber(o2); return n2 == (n1 - 1L); } + @Override public int compare(Exchange o1, Exchange o2) { Long n1 = getSequenceNumber(o1); Long n2 = getSequenceNumber(o2); return n1.compareTo(n2); } - private long getSequenceNumber(Exchange exchange) { + private Long getSequenceNumber(Exchange exchange) { return expression.evaluate(exchange, Long.class); } + + @Override + public boolean isValid(Exchange exchange) { + Long num = null; + try { + num = expression.evaluate(exchange, Long.class); + } catch (Exception e) { + // ignore + } + return num != null; + } + + @Override + public String toString() { + return "Comparator[" + expression + "]"; + } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ElementComparator.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ElementComparator.java?rev=1170215&r1=1170214&r2=1170215&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ElementComparator.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ElementComparator.java Tue Sep 13 15:42:48 2011 @@ -28,7 +28,7 @@ class ElementComparator implements Se /** * A sequence element comparator this comparator delegates to. */ - private SequenceElementComparator comparator; + private final SequenceElementComparator comparator; /** * Creates a new element comparator instance. @@ -39,16 +39,28 @@ class ElementComparator implements Se this.comparator = comparator; } + @Override public boolean predecessor(Element o1, Element o2) { return comparator.predecessor(o1.getObject(), o2.getObject()); } + @Override public boolean successor(Element o1, Element o2) { return comparator.successor(o1.getObject(), o2.getObject()); } + @Override public int compare(Element o1, Element o2) { return comparator.compare(o1.getObject(), o2.getObject()); } + @Override + public boolean isValid(Element o1) { + return comparator.isValid(o1.getObject()); + } + + @Override + public String toString() { + return comparator.toString(); + } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java?rev=1170215&r1=1170214&r2=1170215&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java Tue Sep 13 15:42:48 2011 @@ -18,6 +18,7 @@ package org.apache.camel.processor.reseq import java.util.Timer; +import org.apache.camel.CamelExchangeException; import org.apache.camel.util.concurrent.ThreadHelper; /** @@ -182,10 +183,17 @@ public class ResequencerEngine { * scheduled for timing out. After being timed out it is ready for delivery. * * @param o an element. + * @throws IllegalArgumentException if the element cannot be used with this resequencer engine */ public synchronized void insert(E o) { // wrap object into internal element Element element = new Element(o); + + // validate the exchange has no problem + if (!sequence.comparator().isValid(element)) { + throw new IllegalArgumentException("Element cannot be used in comparator: " + sequence.comparator()); + } + // add element to sequence in proper order sequence.add(element); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java?rev=1170215&r1=1170214&r2=1170215&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java Tue Sep 13 15:42:48 2011 @@ -44,5 +44,13 @@ public interface SequenceElementComparat * @return true if its an immediate successor */ boolean successor(E o1, E o2); + + /** + * Returns true if the o1 can be used in this comparator. + * + * @param o1 a sequence element + * @return true if its usable for this comparator + */ + boolean isValid(E o1); } Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchIgnoreInvalidExchangesTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchIgnoreInvalidExchangesTest.java?rev=1170215&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchIgnoreInvalidExchangesTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchIgnoreInvalidExchangesTest.java Tue Sep 13 15:42:48 2011 @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.builder.RouteBuilder; + +/** + * + */ +public class ResequenceBatchIgnoreInvalidExchangesTest extends ResequenceStreamIgnoreInvalidExchangesTest { + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // START SNIPPET: e1 + from("direct:start") + .resequence(header("seqno")).batch().timeout(1000) + // ignore invalid exchanges (they are discarded) + .ignoreInvalidExchanges() + .to("mock:result"); + // END SNIPPET: e1 + } + }; + } +} Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchNotIgnoreInvalidExchangesTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchNotIgnoreInvalidExchangesTest.java?rev=1170215&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchNotIgnoreInvalidExchangesTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceBatchNotIgnoreInvalidExchangesTest.java Tue Sep 13 15:42:48 2011 @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.builder.RouteBuilder; + +/** + * + */ +public class ResequenceBatchNotIgnoreInvalidExchangesTest extends ResequenceStreamNotIgnoreInvalidExchangesTest { + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .resequence(header("seqno")).batch().timeout(1000) + .to("mock:result"); + } + }; + } +} Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamIgnoreInvalidExchangesTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamIgnoreInvalidExchangesTest.java?rev=1170215&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamIgnoreInvalidExchangesTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamIgnoreInvalidExchangesTest.java Tue Sep 13 15:42:48 2011 @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +/** + * + */ +public class ResequenceStreamIgnoreInvalidExchangesTest extends ContextTestSupport { + + public void testBadFirstMessage() throws Exception { + // bad messages is ignored + getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D"); + + template.sendBody("direct:start", "A"); + template.sendBodyAndHeader("direct:start", "D", "seqno", 4); + template.sendBodyAndHeader("direct:start", "C", "seqno", 3); + template.sendBodyAndHeader("direct:start", "B", "seqno", 2); + + assertMockEndpointsSatisfied(); + } + + public void testBadSecondMessage() throws Exception { + // bad messages is ignored + getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D"); + + template.sendBodyAndHeader("direct:start", "D", "seqno", 4); + template.sendBody("direct:start", "A"); + template.sendBodyAndHeader("direct:start", "C", "seqno", 3); + template.sendBodyAndHeader("direct:start", "B", "seqno", 2); + + assertMockEndpointsSatisfied(); + } + + public void testBadThirdMessage() throws Exception { + // bad messages is ignored + getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D"); + + template.sendBodyAndHeader("direct:start", "D", "seqno", 4); + template.sendBodyAndHeader("direct:start", "C", "seqno", 3); + template.sendBody("direct:start", "A"); + template.sendBodyAndHeader("direct:start", "B", "seqno", 2); + + assertMockEndpointsSatisfied(); + } + + public void testBadForthMessage() throws Exception { + // bad messages is ignored + getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D"); + + template.sendBodyAndHeader("direct:start", "D", "seqno", 4); + template.sendBodyAndHeader("direct:start", "C", "seqno", 3); + template.sendBodyAndHeader("direct:start", "B", "seqno", 2); + template.sendBody("direct:start", "A"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // START SNIPPET: e1 + from("direct:start") + .resequence(header("seqno")).stream().timeout(1000) + // ignore invalid exchanges (they are discarded) + .ignoreInvalidExchanges() + .to("mock:result"); + // END SNIPPET: e1 + } + }; + } +} Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamNotIgnoreInvalidExchangesTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamNotIgnoreInvalidExchangesTest.java?rev=1170215&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamNotIgnoreInvalidExchangesTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamNotIgnoreInvalidExchangesTest.java Tue Sep 13 15:42:48 2011 @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.CamelExecutionException; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +/** + * + */ +public class ResequenceStreamNotIgnoreInvalidExchangesTest extends ContextTestSupport { + + public void testBadFirstMessage() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D"); + + try { + template.sendBody("direct:start", "A"); + fail("Should fail"); + } catch (CamelExecutionException e) { + // expected + } + template.sendBodyAndHeader("direct:start", "D", "seqno", 4); + template.sendBodyAndHeader("direct:start", "C", "seqno", 3); + template.sendBodyAndHeader("direct:start", "B", "seqno", 2); + + assertMockEndpointsSatisfied(); + } + + public void testBadSecondMessage() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D"); + + template.sendBodyAndHeader("direct:start", "D", "seqno", 4); + try { + template.sendBody("direct:start", "A"); + fail("Should fail"); + } catch (CamelExecutionException e) { + // expected + } + template.sendBodyAndHeader("direct:start", "C", "seqno", 3); + template.sendBodyAndHeader("direct:start", "B", "seqno", 2); + + assertMockEndpointsSatisfied(); + } + + public void testBadThirdMessage() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D"); + + template.sendBodyAndHeader("direct:start", "D", "seqno", 4); + template.sendBodyAndHeader("direct:start", "C", "seqno", 3); + try { + template.sendBody("direct:start", "A"); + fail("Should fail"); + } catch (CamelExecutionException e) { + // expected + } + template.sendBodyAndHeader("direct:start", "B", "seqno", 2); + + assertMockEndpointsSatisfied(); + } + + public void testBadForthMessage() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D"); + + template.sendBodyAndHeader("direct:start", "D", "seqno", 4); + template.sendBodyAndHeader("direct:start", "C", "seqno", 3); + template.sendBodyAndHeader("direct:start", "B", "seqno", 2); + try { + template.sendBody("direct:start", "A"); + fail("Should fail"); + } catch (CamelExecutionException e) { + // expected + } + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .resequence(header("seqno")).stream().timeout(1000) + .to("mock:result"); + } + }; + } +} Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/IntegerComparator.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/IntegerComparator.java?rev=1170215&r1=1170214&r2=1170215&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/IntegerComparator.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/IntegerComparator.java Tue Sep 13 15:42:48 2011 @@ -18,16 +18,24 @@ package org.apache.camel.processor.reseq public class IntegerComparator implements SequenceElementComparator { + @Override public boolean predecessor(Integer o1, Integer o2) { return o1.intValue() == (o2.intValue() - 1); } + @Override public boolean successor(Integer o1, Integer o2) { return o2.intValue() == (o1.intValue() - 1); } + @Override public int compare(Integer o1, Integer o2) { return o1.compareTo(o2); } + @Override + public boolean isValid(Integer o1) { + return o1 != null; + } + } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestComparator.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestComparator.java?rev=1170215&r1=1170214&r2=1170215&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestComparator.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestComparator.java Tue Sep 13 15:42:48 2011 @@ -18,16 +18,24 @@ package org.apache.camel.processor.reseq public class TestComparator implements SequenceElementComparator { + @Override public boolean predecessor(TestObject o1, TestObject o2) { return (o2.getValue() - 1) == o1.getValue(); } + @Override public boolean successor(TestObject o1, TestObject o2) { return (o1.getValue() - 1) == o2.getValue(); } + @Override public int compare(TestObject o1, TestObject o2) { return Integer.valueOf(o1.getValue()).compareTo(o2.getValue()); } + @Override + public boolean isValid(TestObject o1) { + return o1 != null; + } + }