camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r570379 [1/2] - in /activemq/camel/trunk: camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/model/config/ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/main/java/org/apache/came...
Date Tue, 28 Aug 2007 11:04:57 GMT
Author: jstrachan
Date: Tue Aug 28 04:04:55 2007
New Revision: 570379

URL: http://svn.apache.org/viewvc?rev=570379&view=rev
Log:
applied the excellent patch from Martin Krasser for CAMEL-125 to add a stream based resequencer implementation which focusses on gaps in the message stream rather than just reordering a batch of messages. Great job!

Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/package-info.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Element.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ElementComparator.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ExpressionResultComparator.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Sequence.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceSender.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Timeout.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/TimeoutHandler.java   (with props)
    activemq/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/config/
    activemq/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/config/jaxb.index
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java   (with props)
    activemq/camel/trunk/camel-core/src/test/resources/org/apache/camel/model/resequencerBatch.xml   (with props)
    activemq/camel/trunk/camel-core/src/test/resources/org/apache/camel/model/resequencerStream.xml   (with props)
    activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringStreamResequencerTest.java   (with props)
    activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/streamResequencer.xml   (with props)
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/model/XmlParseTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/model/XmlTestSupport.java
    activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/handler/CamelNamespaceHandler.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java?rev=570379&r1=570378&r2=570379&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java Tue Aug 28 04:04:55 2007
@@ -25,14 +25,16 @@
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.impl.RouteContext;
+import org.apache.camel.model.config.BatchResequencerConfig;
+import org.apache.camel.model.config.StreamResequencerConfig;
 import org.apache.camel.model.language.ExpressionType;
 import org.apache.camel.processor.Resequencer;
+import org.apache.camel.processor.StreamResequencer;
 
 /**
  * @version $Revision: 1.1 $
@@ -45,16 +47,66 @@
     private List<ExpressionType> expressions = new ArrayList<ExpressionType>();
     @XmlElementRef
     private List<ProcessorType> outputs = new ArrayList<ProcessorType>();
+    // Binding annotation at setter 
+    private BatchResequencerConfig batchConfig;
+    // Binding annotation at setter 
+    private StreamResequencerConfig streamConfig;
     @XmlTransient
     private List<Expression> expressionList;
 
     public ResequencerType() {
+        this(null);
     }
 
     public ResequencerType(List<Expression> expressions) {
         this.expressionList = expressions;
+        this.batch();
     }
 
+    /**
+     * Configures the stream-based resequencing algorithm using the default
+     * configuration.
+     * 
+     * @return <code>this</code> instance.
+     */
+    public ResequencerType stream() {
+        return stream(StreamResequencerConfig.getDefault());
+    }
+    
+    /**
+     * Configures the batch-based resequencing algorithm using the default
+     * configuration.
+     * 
+     * @return <code>this</code> instance.
+     */
+    public ResequencerType batch() {
+        return batch(BatchResequencerConfig.getDefault());
+    }
+    
+    /**
+     * Configures the stream-based resequencing algorithm using the given
+     * {@link StreamResequencerConfig}.
+     * 
+     * @return <code>this</code> instance.
+     */
+    public ResequencerType stream(StreamResequencerConfig config) {
+        this.streamConfig = config;
+        this.batchConfig = null;
+        return this;
+    }
+    
+    /**
+     * Configures the batch-based resequencing algorithm using the given
+     * {@link BatchResequencerConfig}.
+     * 
+     * @return <code>this</code> instance.
+     */
+    public ResequencerType batch(BatchResequencerConfig config) {
+        this.batchConfig = config;
+        this.streamConfig = null;
+        return this;
+    }
+    
     @Override
     public String toString() {
         return "Resequencer[ " + getExpressions() + " -> " + getOutputs() + "]";
@@ -85,21 +137,82 @@
         this.outputs = outputs;
     }
 
+    public BatchResequencerConfig getBatchConfig() {
+        return batchConfig;
+    }
+
+    public BatchResequencerConfig getBatchConfig(BatchResequencerConfig defaultConfig) {
+        return batchConfig;
+    }
+
+    public StreamResequencerConfig getStreamConfig() {
+        return streamConfig;
+    }
+    
+    //
+    // TODO: find out how to have these two within an <xsd:choice>
+    //
+    
+    @XmlElement(name="batch-config", required=false)
+    public void setBatchConfig(BatchResequencerConfig batchConfig) {
+        batch(batchConfig);
+    }
+
+    @XmlElement(name="stream-config", required=false)
+    public void setStreamConfig(StreamResequencerConfig streamConfig) {
+        stream(streamConfig);
+    }
+
+    //
+    // END_TODO
+    //
+    
+    @Override
+    public Processor createProcessor(RouteContext routeContext) throws Exception {
+        return createStreamResequencer(routeContext, streamConfig);
+    }
+
+    @Override
     public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
-        Endpoint from = routeContext.getEndpoint();
-        final Processor processor = routeContext.createProcessor(this);
-        final Resequencer resequencer = new Resequencer(from, processor, resolveExpressionList(routeContext));
+        if (batchConfig != null) {
+            routes.add(createBatchResequencerRoute(routeContext));
+        } else {
+            // StreamResequencer created via createProcessor method
+            super.addRoutes(routeContext, routes);
+        }
+    }
 
-        Route route = new Route<Exchange>(from, resequencer) {
+    private Route<Exchange> createBatchResequencerRoute(RouteContext routeContext) throws Exception {
+        final Resequencer resequencer = createBatchResequencer(routeContext, batchConfig);
+        return new Route<Exchange>(routeContext.getEndpoint(), resequencer) {
             @Override
             public String toString() {
-                return "ResequencerRoute[" + getEndpoint() + " -> " + processor + "]";
+                return "BatchResequencerRoute[" + getEndpoint() + " -> " + resequencer.getProcessor() + "]";
             }
         };
-
-        routes.add(route);
     }
-
+    
+    protected Resequencer createBatchResequencer(RouteContext routeContext, 
+            BatchResequencerConfig config) throws Exception {
+        Processor processor = routeContext.createProcessor(this);
+        Resequencer resequencer = new Resequencer(routeContext.getEndpoint(), 
+                processor, resolveExpressionList(routeContext));
+        resequencer.setBatchSize(config.getBatchSize());
+        resequencer.setBatchTimeout(config.getBatchTimeout());
+        return resequencer;
+    }
+    
+    protected StreamResequencer createStreamResequencer(RouteContext routeContext, 
+            StreamResequencerConfig config) throws Exception {
+        config.getComparator().setExpressions(resolveExpressionList(routeContext));
+        Processor processor = routeContext.createProcessor(this);
+        StreamResequencer resequencer = new StreamResequencer(processor, 
+                config.getComparator(), config.getCapacity());
+        resequencer.setTimeout(config.getTimeout());
+        return resequencer;
+        
+    }
+    
     private List<Expression> resolveExpressionList(RouteContext routeContext) {
         if (expressionList == null) {
             expressionList = new ArrayList<Expression>();
@@ -112,4 +225,4 @@
         }
         return expressionList;
     }
-}
\ No newline at end of file
+}

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java?rev=570379&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java Tue Aug 28 04:04:55 2007
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model.config;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.camel.processor.Resequencer;
+
+/**
+ * Defines the configuration parameters for the batch-processing
+ * {@link Resequencer}. Usage example:
+ * 
+ * <pre>
+ * from(&quot;direct:start&quot;).resequencer(body()).batch(
+ *         BatchResequencerConfig.getDefault()).to(&quot;mock:result&quot;)
+ * </pre>
+ * is equivalent to
+ * 
+ * <pre>
+ * from(&quot;direct:start&quot;).resequencer(body()).batch().to(&quot;mock:result&quot;)
+ * </pre>
+ * 
+ * or
+ * 
+ * <pre>
+ * from(&quot;direct:start&quot;).resequencer(body()).to(&quot;mock:result&quot;)
+ * </pre>
+ * 
+ * Custom values for <code>batchSize</code> and <code>batchTimeout</code>
+ * can be set like in this example:
+ * 
+ * <pre>
+ * from(&quot;direct:start&quot;).resequencer(body()).batch(
+ *         new BatchResequencerConfig(300, 400L)).to(&quot;mock:result&quot;)
+ * </pre>
+ * 
+ * @author Martin Krasser
+ * 
+ * @version $Revision$
+ */
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class BatchResequencerConfig {
+
+    @XmlAttribute
+    private Integer batchSize; // optional XML attribute requires wrapper object 
+
+    @XmlAttribute
+    private Long batchTimeout; // optional XML attribute requires wrapper object
+
+    /**
+     * Creates a new {@link BatchResequencerConfig} instance using default
+     * values for <code>batchSize</code> (100) and <code>batchTimeout</code>
+     * (1000L).
+     */
+    public BatchResequencerConfig() {
+        this(100, 1000L);
+    }
+    
+    /**
+     * Creates a new {@link BatchResequencerConfig} instance using the given
+     * values for <code>batchSize</code> and <code>batchTimeout</code>.
+     * 
+     * @param batchSize
+     *            size of the batch to be re-ordered.
+     * @param batchTimeout
+     *            timeout for collecting elements to be re-ordered.
+     */
+    public BatchResequencerConfig(int batchSize, long batchTimeout) {
+        this.batchSize = batchSize;
+        this.batchTimeout = batchTimeout;
+    }
+    
+    /**
+     * Returns a new {@link BatchResequencerConfig} instance using default
+     * values for <code>batchSize</code> (100) and <code>batchTimeout</code>
+     * (1000L).
+     * 
+     * @return a default {@link BatchResequencerConfig}.
+     */
+    public static BatchResequencerConfig getDefault() {
+        return new BatchResequencerConfig();
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    public long getBatchTimeout() {
+        return batchTimeout;
+    }
+
+    public void setBatchTimeout(long batchTimeout) {
+        this.batchTimeout = batchTimeout;
+    }
+    
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java?rev=570379&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java Tue Aug 28 04:04:55 2007
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model.config;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.processor.StreamResequencer;
+import org.apache.camel.processor.resequencer.DefaultExchangeComparator;
+import org.apache.camel.processor.resequencer.ExpressionResultComparator;
+
+/**
+ * Defines the configuration parameters for the {@link StreamResequencer}.
+ * Usage example:
+ * 
+ * <pre>
+ * from(&quot;direct:start&quot;).resequencer(header(&quot;seqnum&quot;)).stream(
+ *         StreamResequencerConfig.getDefault()).to(&quot;mock:result&quot;)
+ * </pre>
+ * 
+ * is equivalent to
+ * 
+ * <pre>
+ * from(&quot;direct:start&quot;).resequencer(header(&quot;seqnum&quot;)).stream().to(&quot;mock:result&quot;)
+ * </pre>
+ * 
+ * Custom values for <code>capacity</code> and <code>timeout</code> can be
+ * set like in this example:
+ * 
+ * <pre>
+ * from(&quot;direct:start&quot;).resequencer(header(&quot;seqnum&quot;)).stream(
+ *         new StreamResequencerConfig(300, 400L)).to(&quot;mock:result&quot;)
+ * </pre>
+ * 
+ * @author Martin Krasser
+ * 
+ * @version $Revision$
+ */
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class StreamResequencerConfig {
+
+    @XmlAttribute
+    private Integer capacity; // optional XML attribute requires wrapper object
+
+    @XmlAttribute
+    private Long timeout; // optional XML attribute requires wrapper object
+    
+    @XmlTransient
+    private ExpressionResultComparator<Exchange> comparator;
+
+    /**
+     * Creates a new {@link StreamResequencerConfig} instance using default
+     * values for <code>capacity</code> (100) and <code>timeout</code>
+     * (1000L). Elements of the sequence are compared using the
+     * {@link DefaultExchangeComparator}.
+     */
+    public StreamResequencerConfig() {
+        this(100, 1000L);
+    }
+
+    /**
+     * Creates a new {@link BatchResequencerConfig} instance using the given
+     * values for <code>capacity</code> and <code>timeout</code>. Elements
+     * of the sequence are compared using the {@link DefaultExchangeComparator}.
+     * 
+     * @param capacity
+     *            capacity of the resequencer's inbound queue.
+     * @param timeout.
+     *            minimum time to wait for missing elements (messages).
+     */
+    public StreamResequencerConfig(int capacity, long timeout) {
+        this(capacity, timeout, new DefaultExchangeComparator());
+    }
+
+    /**
+     * Creates a new {@link BatchResequencerConfig} instance using the given
+     * values for <code>capacity</code> and <code>timeout</code>. Elements
+     * of the sequence are compared with the given
+     * {@link ExpressionResultComparator}.
+     * 
+     * @param capacity
+     *            capacity of the resequencer's inbound queue.
+     * @param timeout.
+     *            minimum time to wait for missing elements (messages).
+     */
+    public StreamResequencerConfig(int capacity, long timeout, ExpressionResultComparator<Exchange> comparator) {
+        this.capacity = capacity;
+        this.timeout = timeout;
+        this.comparator = comparator;
+    }
+
+    /**
+     * Returns a new {@link StreamResequencerConfig} instance using default
+     * values for <code>capacity</code> (100) and <code>timeout</code>
+     * (1000L). Elements of the sequence are compared using the
+     * {@link DefaultExchangeComparator}.
+     * 
+     * @return a default {@link StreamResequencerConfig}.
+     */
+    public static StreamResequencerConfig getDefault() {
+        return new StreamResequencerConfig();
+    }
+    
+    public int getCapacity() {
+        return capacity;
+    }
+
+    public void setCapacity(int capacity) {
+        this.capacity = capacity;
+    }
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    public ExpressionResultComparator<Exchange> getComparator() {
+        return comparator;
+    }
+
+    public void setComparator(ExpressionResultComparator<Exchange> comparator) {
+        this.comparator = comparator;
+    }
+    
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/package-info.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/package-info.java?rev=570379&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/package-info.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/package-info.java Tue Aug 28 04:04:55 2007
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@javax.xml.bind.annotation.XmlSchema(namespace = "http://activemq.apache.org/camel/schema/spring", elementFormDefault = javax.xml.bind.annotation.XmlNsForm.QUALIFIED) 
+package org.apache.camel.model.config;

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/package-info.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=570379&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java Tue Aug 28 04:04:55 2007
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.processor.resequencer.ResequencerEngine;
+import org.apache.camel.processor.resequencer.SequenceElementComparator;
+import org.apache.camel.processor.resequencer.SequenceSender;
+
+/**
+ * A resequencer that re-orders a (continuous) stream of {@link Exchange}s. The
+ * algorithm implemented by {@link ResequencerEngine} is based on the detection
+ * of gaps in a message stream rather than on a fixed batch size. Gap detection
+ * in combination with timeouts removes the constraint of having to know the
+ * number of messages of a sequence (i.e. the batch size) in advance.
+ * <p>
+ * Messages must contain a unique sequence number for which a predecessor and a
+ * successor is known. For example a message with the sequence number 3 has a
+ * predecessor message with the sequence number 2 and a successor message with
+ * the sequence number 4. The message sequence 2,3,5 has a gap because the
+ * sucessor of 3 is missing. The resequencer therefore has to retain message 5
+ * until message 4 arrives (or a timeout occurs).
+ * 
+ * @author Martin Krasser
+ * 
+ * @version $Revision$
+ */
+public class StreamResequencer extends DelegateProcessor implements Processor {
+
+    private ResequencerEngine<Exchange> reseq;
+    private BlockingQueue<Exchange> queue;
+    private SequenceSender sender;
+    
+    /**
+     * Creates a new {@link StreamResequencer} instance.
+     * 
+     * @param processor
+     *            the next processor that processes the re-ordered exchanges.
+     * @param comparator
+     *            a {@link SequenceElementComparator} for comparing sequence
+     *            number contained in {@link Exchange}s.
+     * @param capacity
+     *            the capacity of the inbound queue.
+     */
+    public StreamResequencer(Processor processor, SequenceElementComparator<Exchange> comparator, int capacity) {
+        super(processor);
+        queue = new LinkedBlockingQueue<Exchange>();
+        reseq = new ResequencerEngine<Exchange>(comparator, capacity);
+        reseq.setOutQueue(queue);
+    }
+    
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        sender = new SequenceSender(getProcessor());
+        sender.setQueue(queue);
+        sender.start();
+
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        reseq.stop();
+        sender.cancel();
+        super.doStop();
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        reseq.put(exchange);
+    }
+
+    public long getTimeout() {
+        return reseq.getTimeout();
+    }
+
+    public void setTimeout(long timeout) {
+        reseq.setTimeout(timeout);
+    }
+
+    @Override
+    public String toString() {
+        return "StreamResequencer[to: " + getProcessor() + "]";
+    }
+
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java?rev=570379&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java Tue Aug 28 04:04:55 2007
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.resequencer;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+
+/**
+ * Compares elements of an {@link Exchange} sequence by comparing
+ * <code>long</code> values returned by this comaprator's
+ * <code>expression</code>. The expression is set during route definition
+ * e.g.
+ * 
+ * <pre>
+ *    ...resequencer(header(&quot;seqnum&quot;)).stream()...
+ * </pre>
+ * 
+ * @author Martin Krasser
+ * 
+ * @version $Revision$
+ */
+public class DefaultExchangeComparator implements ExpressionResultComparator<Exchange> {
+
+    private Expression<Exchange> expression;
+
+    public Expression<Exchange> getExpression() {
+        return expression;
+    }
+
+    public void setExpression(Expression<Exchange> expression) {
+        this.expression = expression;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void setExpressions(List<Expression> expressions) {
+        if (expressions.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "Expression required to resolve sequence number");
+        } else if (expressions.size() > 1) {
+            throw new IllegalArgumentException(
+                    "More than one expression currently not supported");
+        }
+        expression = expressions.get(0);
+    }
+
+    public boolean predecessor(Exchange o1, Exchange o2) {
+        long n1 = getSequenceNumber(o1);
+        long n2 = getSequenceNumber(o2);
+        return n1 == (n2 - 1L);
+    }
+
+    public boolean successor(Exchange o1, Exchange o2) {
+        long n1 = getSequenceNumber(o1);
+        long n2 = getSequenceNumber(o2);
+        return n2 == (n1 - 1L);
+    }
+
+    public int compare(Exchange o1, Exchange o2) {
+        Long n1 = getSequenceNumber(o1);
+        Long n2 = getSequenceNumber(o2);
+        return n1.compareTo(n2);
+    }
+
+    private long getSequenceNumber(Exchange exchange) {
+        return (Long)expression.evaluate(exchange);
+    }
+    
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Element.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Element.java?rev=570379&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Element.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Element.java Tue Aug 28 04:04:55 2007
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.resequencer;
+
+/**
+ * A container for objects to be resequenced. This container can be scheduled
+ * for timing out. Non-scheduled objects or already timed-out objects are ready
+ * for being released by the {@link ResequencerEngine}.
+ * 
+ * @author Martin Krasser
+ * 
+ * @version $Revision
+ */
+class Element<E> implements TimeoutHandler {
+
+    /**
+     * The contained object.
+     */
+    private E object;
+
+    /**
+     * Not <code>null</code> if this element is currently beeing scheduled for
+     * timing out.
+     */
+    private Timeout timeout;
+    
+    /**
+     * Creates a new container instance.
+     * 
+     * @param object contained object.
+     */
+    public Element(E object) {
+        this.object = object;
+    }
+    
+    /**
+     * Returns the contained object.
+     * 
+     * @return the contained object.
+     */
+    public E getObject() {
+        return object;
+    }
+
+    /**
+     * Returns <code>true</code> if this element is currently scheduled for
+     * timing out.
+     * 
+     * @return <code>true</code> if scheduled or <code>false</code> if not
+     *         scheduled or already timed-out.
+     */
+    public synchronized boolean scheduled() {
+        return timeout != null;
+    }
+    
+    /**
+     * Schedules the given timeout task. Before this methods calls the
+     * {@link Timeout#schedule()} method it adds this element as timeout
+     * listener.
+     * 
+     * @param t a timeout task.
+     */
+    public synchronized void schedule(Timeout t) {
+        this.timeout = t;
+        this.timeout.addTimeoutHandlerFirst(this);
+        this.timeout.schedule();
+    }
+    
+    /**
+     * Cancels the scheduled timeout for this element. If this element is not
+     * scheduled or has already timed-out this method has no effect.
+     */
+    public synchronized void cancel() {
+        if (timeout != null) {
+            timeout.cancel();
+        }
+        timeout(null);
+    }
+
+    /**
+     * Marks this element as timed-out.
+     * 
+     * @param t timeout task that caused the notification.
+     */
+    public synchronized void timeout(Timeout t) {
+        this.timeout = null;
+    }
+    
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Element.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ElementComparator.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ElementComparator.java?rev=570379&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ElementComparator.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ElementComparator.java Tue Aug 28 04:04:55 2007
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.resequencer;
+
+/**
+ * A strategy for comparing {@link Element} instances. This strategy uses
+ * another {@link SequenceElementComparator} instance for comparing elements
+ * contained by {@link Element} instances.
+ * 
+ * @author Martin Krasser
+ * 
+ * @version $Revision
+ */
+class ElementComparator<E> implements SequenceElementComparator<Element<E>> {
+
+    /**
+     * A sequence element comparator this comparator delegates to.
+     */
+    private SequenceElementComparator<E> comparator;
+    
+    /**
+     * Creates a new element comparator instance.
+     * 
+     * @param comparator a sequence element comparator this comparator delegates
+     *        to.
+     */
+    public ElementComparator(SequenceElementComparator<E> comparator) {
+        this.comparator = comparator;
+    }
+    
+    /**
+     * @see SequenceElementComparator#predecessor(java.lang.Object, java.lang.Object)
+     */
+    public boolean predecessor(Element<E> o1, Element<E> o2) {
+        return comparator.predecessor(o1.getObject(), o2.getObject());
+    }
+
+    /**
+     * @see SequenceElementComparator#successor(java.lang.Object, java.lang.Object)
+     */
+    public boolean successor(Element<E> o1, Element<E> o2) {
+        return comparator.successor(o1.getObject(), o2.getObject());
+    }
+
+    /**
+     * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
+     */
+    public int compare(Element<E> o1, Element<E> o2) {
+        return comparator.compare(o1.getObject(), o2.getObject());
+    }
+
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ElementComparator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ExpressionResultComparator.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ExpressionResultComparator.java?rev=570379&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ExpressionResultComparator.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ExpressionResultComparator.java Tue Aug 28 04:04:55 2007
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.resequencer;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+
+/**
+ * A {@link SequenceElementComparator} that compares {@link Exchange}s based on
+ * the result of an expression evaluation.
+ * 
+ * @author Martin Krasser
+ * 
+ * @version $Revision$
+ */
+public interface ExpressionResultComparator<E extends Exchange> extends SequenceElementComparator<E> {
+
+    /**
+     * Sets the list expressions used for comparing {@link Exchange}s.
+     * 
+     * @param expressions a list of {@link Expression} objects.
+     */
+    void setExpressions(List<Expression> expressions);
+
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ExpressionResultComparator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java?rev=570379&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java Tue Aug 28 04:04:55 2007
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.resequencer;
+
+import java.util.Queue;
+import java.util.Timer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Resequences elements based on a given {@link SequenceElementComparator}.
+ * This resequencer is designed for resequencing element streams. Resequenced
+ * elements are added to an output {@link Queue}. The resequencer is configured
+ * via the <code>timeout</code> and <code>capacity</code> properties.
+ * 
+ * <ul>
+ * <li><code>timeout</code>. Defines the timeout (in milliseconds) for a
+ * given element managed by this resequencer. An out-of-sequence element can
+ * only be marked as <i>ready-for-delivery</i> if it either times out or if it
+ * has an immediate predecessor (in that case it is in-sequence). If an
+ * immediate predecessor of a waiting element arrives the timeout task for the
+ * waiting element will be cancelled (which marks it as <i>ready-for-delivery</i>).
+ * <p>
+ * If the maximum out-of-sequence time between elements within a stream is
+ * known, the <code>timeout</code> value should be set to this value. In this
+ * case it is guaranteed that all elements of a stream will be delivered in
+ * sequence to the output queue. However, large <code>timeout</code> values
+ * might require a very high resequencer <code>capacity</code> which might be
+ * in conflict with available memory resources. The lower the
+ * <code>timeout</code> value is compared to the out-of-sequence time between
+ * elements within a stream the higher the probability is for out-of-sequence
+ * elements delivered by this resequencer.</li>
+ * <li><code>capacity</code>. The capacity of this resequencer.</li>
+ * </ul>
+ * 
+ * Whenever a timeout for a certain element occurs or an element has been added
+ * to this resequencer a delivery attempt is started. If a (sub)sequence of
+ * elements is <i>ready-for-delivery</i> then they are added to output queue.
+ * <p>
+ * The resequencer remembers the last-delivered element. If an element arrives
+ * which is the immediate successor of the last-delivered element it will be
+ * delivered immediately and the last-delivered element is adjusted accordingly.
+ * If the last-delivered element is <code>null</code> i.e. the resequencer was
+ * newly created the first arriving element will wait <code>timeout</code>
+ * milliseconds for being delivered to the output queue.
+ * 
+ * @author Martin Krasser
+ * 
+ * @version $Revision
+ */
+public class ResequencerEngine<E> implements TimeoutHandler {
+
+    private static final Log LOG = LogFactory.getLog(ResequencerEngine.class);
+    
+    private long timeout;
+    
+    private int capacity;
+    
+    private Queue<E> outQueue;
+    
+    private Element<E> lastDelivered;
+
+    /**
+     * A sequence of elements for sorting purposes.
+     */
+    private Sequence<Element<E>> sequence;
+    
+    /**
+     * A timer for scheduling timeout notifications.
+     */
+    private Timer timer;
+    
+    /**
+     * Creates a new resequencer instance with a default timeout of 2000
+     * milliseconds. The capacity is set to {@link Integer#MAX_VALUE}.
+     * 
+     * @param comparator a sequence element comparator.
+     */
+    public ResequencerEngine(SequenceElementComparator<E> comparator) {
+        this(comparator, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Creates a new resequencer instance with a default timeout of 2000
+     * milliseconds.
+     * 
+     * @param comparator a sequence element comparator.
+     * @param capacity the capacity of this resequencer.
+     */
+    public ResequencerEngine(SequenceElementComparator<E> comparator, int capacity) {
+        this.timer = new Timer("Resequencer Timer");
+        this.sequence = createSequence(comparator);
+        this.capacity = capacity;
+        this.timeout = 2000L;
+        this.lastDelivered = null;
+    }
+    
+    /**
+     * Stops this resequencer (i.e. this resequencer's {@link Timer} instance).
+     */
+    public void stop() {
+        this.timer.cancel();
+    }
+    
+    /**
+     * Returns the output queue.
+     * 
+     * @return the output queue.
+     */
+    public Queue<E> getOutQueue() {
+        return outQueue;
+    }
+
+    /**
+     * Sets the output queue.
+     * 
+     * @param outQueue output queue.
+     */
+    public void setOutQueue(Queue<E> outQueue) {
+        this.outQueue = outQueue;
+    }
+
+    /**
+     * Returns this resequencer's timeout value.
+     * 
+     * @return the timeout in milliseconds.
+     */
+    public long getTimeout() {
+        return timeout;
+    }
+
+    /**
+     * Sets this sequencer's timeout value.
+     * 
+     * @param timeout the timeout in milliseconds.
+     */
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    /** 
+     * Handles a timeout notification by starting a delivery attempt.
+     * 
+     * @param timout timeout task that caused the notification.
+     */
+    public synchronized void timeout(Timeout timout) {
+        try {
+            while (deliver()) {
+                // work done in deliver()
+            }
+        } catch (RuntimeException e) {
+            LOG.error("error during delivery", e);
+        }
+    }
+
+    /**
+     * Adds an element to this resequencer throwing an exception if the maximum
+     * capacity is reached.
+     * 
+     * @param o element to be resequenced.
+     * @throws IllegalStateException if the element cannot be added at this time
+     *         due to capacity restrictions.
+     */
+    public synchronized void add(E o) {
+        if (sequence.size() >= capacity) {
+            throw new IllegalStateException("maximum capacity is reached");
+        }
+        insert(o);
+    }
+    
+    /**
+     * Adds an element to this resequencer waiting, if necessary, until capacity
+     * becomes available.
+     * 
+     * @param o element to be resequenced.
+     * @throws InterruptedException if interrupted while waiting.
+     */
+    public synchronized void put(E o) throws InterruptedException {
+        if (sequence.size() >= capacity) {
+            wait();
+        }
+        insert(o);
+    }
+    
+    /**
+     * Returns the last delivered element.
+     * 
+     * @return the last delivered element or <code>null</code> if no delivery
+     *         has been made yet.
+     */
+    E getLastDelivered() {
+        if (lastDelivered == null) {
+            return null;
+        }
+        return lastDelivered.getObject();
+    }
+    
+    /**
+     * Sets the last delivered element. This is for testing purposes only.
+     * 
+     * @param o an element.
+     */
+    void setLastDelivered(E o) {
+        lastDelivered = new Element<E>(o);
+    }
+    
+    /**
+     * Inserts the given element into this resequencing queue (sequence). If the
+     * element is not ready for immediate delivery and has no immediate
+     * presecessor then it is scheduled for timing out. After being timed out it
+     * is ready for delivery.
+     * 
+     * @param o an element.
+     */
+    private void insert(E o) {
+        // wrap object into internal element
+        Element<E> element = new Element<E>(o);
+        // add element to sequence in proper order
+        sequence.add(element);
+
+        Element<E> successor = sequence.successor(element);
+        
+        // check if there is an immediate successor and cancel
+        // timer task (no need to wait any more for timeout)
+        if (successor != null) {
+            successor.cancel();
+        }
+        
+        // start delivery if current element is successor of last delivered element
+        if (successorOfLastDelivered(element)) {
+            // nothing to schedule
+        } else if (sequence.predecessor(element) != null) {
+            // nothing to schedule
+        } else {
+            Timeout t = defineTimeout();
+            element.schedule(t);
+        }
+        
+        // start delivery
+        while (deliver()) {
+            // work done in deliver()
+        }
+    }
+    
+    /**
+     * Attempts to deliver a single element from the head of the resequencer
+     * queue (sequence). Only elements which have not been scheduled for timing
+     * out or which already timed out can be delivered.
+     * 
+     * @return <code>true</code> if the element has been delivered
+     *         <code>false</code> otherwise.
+     */
+    private boolean deliver() {
+        if (sequence.size() == 0) {
+            return false;
+        }
+        // inspect element with lowest sequence value
+        Element<E> element = sequence.first();
+        
+        // if element is scheduled do not deliver and return
+        if (element.scheduled()) {
+            return false;
+        }
+        
+        // remove deliverable element from sequence
+        sequence.remove(element);
+
+        // set the delivered element to last delivered element
+        lastDelivered = element;
+        
+        // notify a waiting thread that capacity is available
+        notify();
+        
+        // add element to output queue
+        outQueue.add(element.getObject());
+
+        // element has been delivered
+        return true;
+    }
+    
+    /**
+     * Returns <code>true</code> if the given element is the immediate
+     * successor of the last delivered element.
+     * 
+     * @param element an element.
+     * @return <code>true</code> if the given element is the immediate
+     *         successor of the last delivered element.
+     */
+    private boolean successorOfLastDelivered(Element<E> element) {
+        if (lastDelivered == null) {
+            return false;
+        }
+        if (sequence.comparator().successor(element, lastDelivered)) {
+            return true;
+        }
+        return false;
+    }
+    
+    /**
+     * Creates a timeout task based on the timeout setting of this resequencer.
+     * 
+     * @return a new timeout task.
+     */
+    private Timeout defineTimeout() {
+        Timeout result = new Timeout(timer, timeout);
+        result.addTimeoutHandler(this);
+        return result;
+    }
+    
+    private static <E> Sequence<Element<E>> createSequence(SequenceElementComparator<E> comparator) {
+        return new Sequence<Element<E>>(new ElementComparator<E>(comparator));
+    }
+    
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Sequence.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Sequence.java?rev=570379&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Sequence.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Sequence.java Tue Aug 28 04:04:55 2007
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.resequencer;
+
+import java.util.TreeSet;
+
+/**
+ * A sorted set of elements with additional methods for obtaining immediate
+ * successors and immediate predecessors of a given element in the sequence.
+ * Successors and predecessors are calculated by using a
+ * {@link SequenceElementComparator}.
+ * 
+ * @author Martin Krasser
+ * 
+ * @version $Revision
+ */
+public class Sequence<E> extends TreeSet<E> {
+
+    private static final long serialVersionUID = 5647393631147741711L;
+
+    private SequenceElementComparator<E> comparator;
+    
+    /**
+     * Creates a new {@link Sequence} instance.
+     * 
+     * @param comparator a strategy for comparing elements of this sequence.
+     */
+    public Sequence(SequenceElementComparator<E> comparator) {
+        super(comparator);
+        this.comparator = comparator;
+    }
+    
+    /**
+     * Returns the immediate predecessor of the given element in this sequence
+     * or <code>null</code> if no predecessor exists.
+     * 
+     * @param e an element which is compared to elements of this sequence.
+     * @return an element of this sequence or <code>null</code>.
+     */
+    public E predecessor(E e) {
+        E elem = lower(e);
+        if (elem == null) {
+            return null;
+        }
+        if (comparator.predecessor(elem, e)) {
+            return elem;
+        }
+        return null;
+    }
+    
+    /**
+     * Returns the immediate successor of the given element in this sequence
+     * or <code>null</code> if no successor exists.
+     * 
+     * @param e an element which is compared to elements of this sequence.
+     * @return an element of this sequence or <code>null</code>.
+     */
+    public E successor(E e) {
+        E elem = higher(e);
+        if (elem == null) {
+            return null;
+        }
+        if (comparator.successor(elem, e)) {
+            return elem;
+        }
+        return null;
+    }
+    
+    /**
+     * Returns this sequence's comparator.
+     * 
+     * @return this sequence's comparator.
+     */
+    public SequenceElementComparator<E> comparator() {
+        return comparator;
+    }
+
+    /**
+     * Returns the next higher element in the sequence to the given element. If
+     * the given element doesn't exist or if it is the last element in the
+     * sequence <code>null</code> is returned. <strong>Please note that this
+     * method is provided for compatibility with Java 5 SE. On a Java 6 SE
+     * platform the same method implemented by the {@link TreeSet}
+     * class should be used for better performance.</strong>
+     * 
+     * @param e an element which is compared to elements of this sequence.
+     * @return an element of this sequence or <code>null</code>.
+     */
+    public E higher(E e) {
+        boolean found = false;
+        for (E current : this) {
+            if (found) {
+                return current;
+            }
+            if (comparator.compare(e, current) == 0) {
+                found = true;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Returns the next lower element in the sequence to the given element. If
+     * the given element doesn't exist or if it is the first element in the
+     * sequence <code>null</code> is returned. <strong>Please note that this
+     * method is provided for compatibility with Java 5 SE. On a Java 6 SE
+     * platform the same method implemented by the {@link TreeSet}
+     * class should be used for better performance.</strong>
+     * 
+     * @param e an element which is compared to elements of this sequence.
+     * @return an element of this sequence or <code>null</code>.
+     */
+    public E lower(E e) {
+        E last = null;
+        for (E current : this) {
+            if (comparator.compare(e, current) == 0) {
+                return last;
+            }
+            last = current;
+        }
+        return last;
+    }
+    
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Sequence.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java?rev=570379&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java Tue Aug 28 04:04:55 2007
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.resequencer;
+
+import java.util.Comparator;
+
+/**
+ * A strategy for comparing elements of a sequence.
+ * 
+ * @author Martin Krasser
+ * 
+ * @version $Revision
+ */
+public interface SequenceElementComparator<E> extends Comparator<E> {
+
+    /**
+     * Returns <code>true</code> if <code>o1</code> is an immediate predecessor
+     * of <code>o2</code>.
+     * 
+     * @param o1 a sequence element.
+     * @param o2 a sequence element.
+     */
+    boolean predecessor(E o1, E o2);
+    
+    /**
+     * Returns <code>true</code> if <code>o1</code> is an immediate successor
+     * of <code>o2</code>.
+     * 
+     * @param o1 a sequence element.
+     * @param o2 a sequence element.
+     */
+    boolean successor(E o1, E o2);
+    
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceSender.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceSender.java?rev=570379&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceSender.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceSender.java Tue Aug 28 04:04:55 2007
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.resequencer;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A thread that takes re-ordered {@link Exchange}s from a blocking queue and
+ * send them to the linked processor.  
+ * 
+ * @author Martin Krasser
+ * 
+ * @version $Revision
+ */
+public class SequenceSender extends Thread {
+
+    private static final Log LOG = LogFactory.getLog(SequenceSender.class);
+    private static final Exchange STOP = createStopSignal();
+    
+    private BlockingQueue<Exchange> queue;
+    private Processor processor;
+    
+    /**
+     * Creates a new {@link SequenceSender} thread.
+     * 
+     * @param processor
+     *            the processor to send re-ordered {@link Exchange}s.
+     */
+    public SequenceSender(Processor processor) {
+        this.processor = processor;
+    }
+    
+    /**
+     * Sets the {@link BlockingQueue} to take messages from.
+     * 
+     * @param queue
+     *            the {@link BlockingQueue} to take messages from.
+     */
+    public void setQueue(BlockingQueue<Exchange> queue) {
+        this.queue = queue;
+    }
+
+    public void run() {
+        while (true) {
+            try {
+                Exchange exchange = queue.take();
+                if (exchange == STOP) {
+                    LOG.info("exit processing loop after cancellation");
+                    return;
+                }
+                processor.process(exchange);
+            } catch (InterruptedException e) {
+                LOG.info("exit processing loop after interrupt");
+                return;
+            } catch (Exception e) {
+                LOG.warn("exception during exchange processing");
+            }
+        }
+    }
+    
+    /**
+     * Cancels this thread.
+     */
+    public void cancel() throws InterruptedException {
+        queue.put(STOP);
+    }
+    
+    private static Exchange createStopSignal() {
+        return (Exchange)Proxy.newProxyInstance(SequenceSender.class.getClassLoader(), 
+                new Class[] {Exchange.class}, createStopHandler());
+    }
+    
+    private static InvocationHandler createStopHandler() {
+        return new InvocationHandler() {
+            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+                throw new RuntimeException("illegal method invocation on stop signal");
+            }
+        };
+    }
+    
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceSender.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Timeout.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Timeout.java?rev=570379&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Timeout.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Timeout.java Tue Aug 28 04:04:55 2007
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.resequencer;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * A timer task that notifies handlers about scheduled timeouts.
+ * 
+ * @see Timer
+ * @see TimerTask
+ * 
+ * @author Martin Krasser
+ * 
+ * @version $Revision
+ */
+public class Timeout extends TimerTask {
+    
+    private List<TimeoutHandler> timeoutHandlers;
+    
+    private Timer timer;
+    
+    private long timeout;
+    
+    /**
+     * Creates a new timeout task using the given {@link Timer} instance a timeout value. The
+     * task is not scheduled immediately. It will be scheduled by calling this
+     * task's {@link #schedule()} method.
+     * 
+     * @param timer
+     * @param timeout
+     */
+    public Timeout(Timer timer, long timeout) {
+        this.timeoutHandlers = new LinkedList<TimeoutHandler>();
+        this.timeout = timeout;
+        this.timer = timer;
+    }
+
+    /**
+     * Returns the list of timeout handlers that have been registered for
+     * notification.
+     * 
+     * @return the list of timeout handlers
+     */
+    public List<TimeoutHandler> getTimeoutHandlers() {
+        return timeoutHandlers;
+    }
+    
+    /**
+     * Appends a new timeout handler at the end of the timeout handler list.
+     * 
+     * @param handler a timeout handler.
+     */
+    public void addTimeoutHandler(TimeoutHandler handler) {
+        timeoutHandlers.add(handler);
+    }
+    
+    /**
+     * inserts a new timeout handler at the beginning of the timeout handler
+     * list.
+     * 
+     * @param handler a timeout handler.
+     */
+    public void addTimeoutHandlerFirst(TimeoutHandler handler) {
+        timeoutHandlers.add(0, handler);
+    }
+    
+    /**
+     * Removes all timeout handlers from the timeout handler list. 
+     */
+    public void clearTimeoutHandlers() {
+        this.timeoutHandlers.clear();
+    }
+    
+    /**
+     * Schedules this timeout task.
+     */
+    public void schedule() {
+        timer.schedule(this, timeout);
+    }
+
+    /**
+     * Notifies all timeout handlers about the scheduled timeout.
+     */
+    @Override
+    public void run() {
+        for (TimeoutHandler observer : timeoutHandlers) {
+            observer.timeout(this);
+        }
+    }
+
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Timeout.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/TimeoutHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/TimeoutHandler.java?rev=570379&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/TimeoutHandler.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/TimeoutHandler.java Tue Aug 28 04:04:55 2007
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.resequencer;
+
+/**
+ * Implemented by classes that handle timeout notifications.
+ * 
+ * @author Martin Krasser
+ * 
+ * @version $Revision
+ */
+public interface TimeoutHandler {
+
+    /**
+     * Handles a timeout notification.
+     * 
+     * @param timeout the timer task that caused this timeout notification.
+     */
+    void timeout(Timeout timeout);
+    
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/TimeoutHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/config/jaxb.index
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/config/jaxb.index?rev=570379&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/config/jaxb.index (added)
+++ activemq/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/config/jaxb.index Tue Aug 28 04:04:55 2007
@@ -0,0 +1,18 @@
+## ------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ------------------------------------------------------------------------
+StreamResequencerConfig
+BatchResequencerConfig

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/model/XmlParseTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/model/XmlParseTest.java?rev=570379&r1=570378&r2=570379&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/model/XmlParseTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/model/XmlParseTest.java Tue Aug 28 04:04:55 2007
@@ -108,6 +108,24 @@
         assertChildTo("to", splitter, "seda:b");
     }
 
+    public void testParseBatchResequencerXml() throws Exception {
+        RouteType route = assertOneRoute("resequencerBatch.xml");
+        ResequencerType resequencer = assertResequencer(route);
+        assertNull(resequencer.getStreamConfig());
+        assertNotNull(resequencer.getBatchConfig());
+        assertEquals(500, resequencer.getBatchConfig().getBatchSize());
+        assertEquals(2000L, resequencer.getBatchConfig().getBatchTimeout());
+    }
+    
+    public void testParseStreamResequencerXml() throws Exception {
+        RouteType route = assertOneRoute("resequencerStream.xml");
+        ResequencerType resequencer = assertResequencer(route);
+        assertNotNull(resequencer.getStreamConfig());
+        assertNull(resequencer.getBatchConfig());
+        assertEquals(100, resequencer.getStreamConfig().getCapacity());
+        assertEquals(2000L, resequencer.getStreamConfig().getTimeout());
+    }
+    
     // Implementation methods
     // -------------------------------------------------------------------------
 
@@ -169,6 +187,11 @@
     protected SplitterType assertSplitter(ProcessorType route) {
         ProcessorType processor = assertOneElement(route.getOutputs());
         return assertIsInstanceOf(SplitterType.class, processor);
+    }
+
+    protected ResequencerType assertResequencer(ProcessorType route) {
+        ProcessorType processor = assertOneElement(route.getOutputs());
+        return assertIsInstanceOf(ResequencerType.class, processor);
     }
 
     protected void assertExpression(ExpressionType expression, String language, String languageExpression) {

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/model/XmlTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/model/XmlTestSupport.java?rev=570379&r1=570378&r2=570379&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/model/XmlTestSupport.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/model/XmlTestSupport.java Tue Aug 28 04:04:55 2007
@@ -51,6 +51,6 @@
     @Override
     protected void setUp() throws Exception {
         super.setUp();
-        jaxbContext = JAXBContext.newInstance("org.apache.camel.model:org.apache.camel.model.language");
+        jaxbContext = JAXBContext.newInstance("org.apache.camel.model:org.apache.camel.model.language:org.apache.camel.model.config");
     }
 }

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java?rev=570379&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java Tue Aug 28 04:04:55 2007
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+public class StreamResequencerTest extends ContextTestSupport {
+
+    protected MockEndpoint resultEndpoint;
+
+    public void sendBodyAndHeader(String endpointUri, final Object body, 
+            final String headerName, final Object headerValue) {
+        template.send(endpointUri, new Processor() {
+            public void process(Exchange exchange) {
+                Message in = exchange.getIn();
+                in.setBody(body);
+                in.setHeader(headerName, headerValue);
+                in.setHeader("testCase", getName());
+            }
+        });        
+    }
+    
+    public void testSendMessagesInWrongOrderButReceiveThemInCorrectOrder() throws Exception {
+        resultEndpoint.expectedBodiesReceived("msg1", "msg2", "msg3", "msg4");
+        sendBodyAndHeader("direct:start", "msg4", "seqnum", 4L);
+        sendBodyAndHeader("direct:start", "msg1", "seqnum", 1L);
+        sendBodyAndHeader("direct:start", "msg3", "seqnum", 3L);
+        sendBodyAndHeader("direct:start", "msg2", "seqnum", 2L);
+        resultEndpoint.assertIsSatisfied();
+        List<Exchange> list = resultEndpoint.getReceivedExchanges();
+        for (Exchange exchange : list) {
+            log.debug("Received: " + exchange);
+        }
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        resultEndpoint = (MockEndpoint) resolveMandatoryEndpoint("mock:result");
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                // START SNIPPET: example
+                from("direct:start").resequencer(header("seqnum")).stream().to("mock:result");
+                // END SNIPPET: example
+            }
+        };
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/test/resources/org/apache/camel/model/resequencerBatch.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/resources/org/apache/camel/model/resequencerBatch.xml?rev=570379&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/resources/org/apache/camel/model/resequencerBatch.xml (added)
+++ activemq/camel/trunk/camel-core/src/test/resources/org/apache/camel/model/resequencerBatch.xml Tue Aug 28 04:04:55 2007
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<routes id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
+    <route>
+    	<from uri="direct:start" />
+    	<resequencer>
+    		<simple>body</simple>
+    		<to uri="mock:result" />
+    	    <!-- 
+    	        batch-config can be ommitted for default resequencer settings
+    	    -->
+    	    <batch-config batchTimeout="2000" batchSize="500"/>
+    	</resequencer>
+    </route>
+</routes>

Propchange: activemq/camel/trunk/camel-core/src/test/resources/org/apache/camel/model/resequencerBatch.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/test/resources/org/apache/camel/model/resequencerStream.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/resources/org/apache/camel/model/resequencerStream.xml?rev=570379&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/resources/org/apache/camel/model/resequencerStream.xml (added)
+++ activemq/camel/trunk/camel-core/src/test/resources/org/apache/camel/model/resequencerStream.xml Tue Aug 28 04:04:55 2007
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<routes id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
+    <route>
+    	<from uri="direct:start" />
+    	<resequencer>
+    		<simple>in.header.seqnum</simple>
+    		<to uri="mock:result" />
+    	    <stream-config timeout="2000"/> <!-- Use default capacity -->
+    	</resequencer>
+    </route>
+</routes>

Propchange: activemq/camel/trunk/camel-core/src/test/resources/org/apache/camel/model/resequencerStream.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/handler/CamelNamespaceHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/handler/CamelNamespaceHandler.java?rev=570379&r1=570378&r2=570379&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/handler/CamelNamespaceHandler.java (original)
+++ activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/handler/CamelNamespaceHandler.java Tue Aug 28 04:04:55 2007
@@ -47,7 +47,7 @@
 import static org.apache.camel.util.ObjectHelper.isNotNullAndNonEmpty;
 
 public class CamelNamespaceHandler extends NamespaceHandlerSupport {
-    public static final String JAXB_PACKAGES = "org.apache.camel.spring:org.apache.camel.model:org.apache.camel.model.language";
+    public static final String JAXB_PACKAGES = "org.apache.camel.spring:org.apache.camel.model:org.apache.camel.model.language:org.apache.camel.model.config";
 
     protected BeanDefinitionParser endpointParser = new BeanDefinitionParser(EndpointFactoryBean.class);
     protected BeanDefinitionParser proxyParser = new BeanDefinitionParser(CamelProxyFactoryBean.class);

Added: activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringStreamResequencerTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringStreamResequencerTest.java?rev=570379&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringStreamResequencerTest.java (added)
+++ activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringStreamResequencerTest.java Tue Aug 28 04:04:55 2007
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor;
+
+import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.StreamResequencerTest;
+
+public class SpringStreamResequencerTest extends StreamResequencerTest {
+    protected CamelContext createCamelContext() throws Exception {
+        return createSpringCamelContext(this, "org/apache/camel/spring/processor/streamResequencer.xml");
+    }
+
+}

Propchange: activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringStreamResequencerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message