camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ge...@apache.org
Subject svn commit: r697732 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/model/ main/java/org/apache/camel/model/config/ main/java/org/apache/camel/processor/ main/java/org/apache/camel/processor/resequencer/ test/java/org/apache/camel...
Date Mon, 22 Sep 2008 09:22:47 GMT
Author: gertv
Date: Mon Sep 22 02:22:47 2008
New Revision: 697732

URL: http://svn.apache.org/viewvc?rev=697732&view=rev
Log:
CAMEL-126: provide a synchronous dispatch for the stream based resequencer

Added:
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/IntegerComparator.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerEngineSync.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerEngineTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerRunner.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/SequenceBuffer.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/SequenceTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestComparator.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestComparatorTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestObject.java
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Element.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ElementComparator.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Sequence.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceSender.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Timeout.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/TimeoutHandler.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/model/XmlParseTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java?rev=697732&r1=697731&r2=697732&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java Mon Sep 22 02:22:47 2008
@@ -162,7 +162,7 @@
         // TODO: find out how to have these two within an <xsd:choice>
         stream(streamConfig);
     }
-
+    
     @Override
     public Processor createProcessor(RouteContext routeContext) throws Exception {
         if (batchConfig != null) {
@@ -173,26 +173,17 @@
         }
     }
 
-    @Override
-    public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
-        if (batchConfig != null) {
-            routes.add(createBatchResequencerRoute(routeContext));
-        } else {
-            // StreamResequencer created via createProcessor method
-            super.addRoutes(routeContext, routes);
-        }
-    }
-
-    private Route<? extends Exchange> createBatchResequencerRoute(RouteContext routeContext) throws Exception {
-        final Resequencer resequencer = createBatchResequencer(routeContext, batchConfig);
-        return new Route(routeContext.getEndpoint(), resequencer) {
-            @Override
-            public String toString() {
-                return "BatchResequencerRoute[" + getEndpoint() + " -> " + resequencer.getProcessor() + "]";
-            }
-        };
-    }
-
+    /**
+     * Creates a batch {@link Resequencer} instance applying the given
+     * <code>config</code>.
+     * 
+     * @param routeContext
+     *            route context.
+     * @param config
+     *            batch resequencer configuration.
+     * @return the configured batch resequencer.
+     * @throws Exception 
+     */
     protected Resequencer createBatchResequencer(RouteContext routeContext,
             BatchResequencerConfig config) throws Exception {
         Processor processor = routeContext.createProcessor(this);
@@ -203,17 +194,49 @@
         return resequencer;
     }
 
-    protected StreamResequencer createStreamResequencer(RouteContext routeContext,
+    /**
+     * Creates a {@link StreamResequencer} instance applying the given
+     * <code>config</code>.
+     * 
+     * @param routeContext
+     *            route context.
+     * @param config
+     *            stream resequencer configuration.
+     * @return the configured stream resequencer.
+     * @throws Exception
+     */
+    protected StreamResequencer createStreamResequencer(RouteContext routeContext, 
             StreamResequencerConfig config) throws Exception {
         config.getComparator().setExpressions(resolveExpressionList(routeContext));
         Processor processor = routeContext.createProcessor(this);
-        StreamResequencer resequencer = new StreamResequencer(processor,
-                config.getComparator(), config.getCapacity());
+        StreamResequencer resequencer = new StreamResequencer(routeContext.getEndpoint(),
+                processor, config.getComparator());
         resequencer.setTimeout(config.getTimeout());
+        resequencer.setCapacity(config.getCapacity());
         return resequencer;
-
+        
     }
-
+    
+    private Route<? extends Exchange> createBatchResequencerRoute(RouteContext routeContext) throws Exception {
+        final Resequencer resequencer = createBatchResequencer(routeContext, batchConfig);
+        return new Route(routeContext.getEndpoint(), resequencer) {
+            @Override
+            public String toString() {
+                return "BatchResequencerRoute[" + getEndpoint() + " -> " + resequencer.getProcessor() + "]";
+            }
+        };
+    }
+    
+    private Route<? extends Exchange> createStreamResequencerRoute(RouteContext routeContext) throws Exception {
+        final StreamResequencer resequencer = createStreamResequencer(routeContext, streamConfig);
+        return new Route(routeContext.getEndpoint(), resequencer) {
+            @Override
+            public String toString() {
+                return "StreamResequencerRoute[" + getEndpoint() + " -> " + resequencer.getProcessor() + "]";
+            }
+        };
+    }
+    
     private List<Expression> resolveExpressionList(RouteContext routeContext) {
         if (expressionList == null) {
             expressionList = new ArrayList<Expression>();

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java?rev=697732&r1=697731&r2=697732&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java Mon Sep 22 02:22:47 2008
@@ -69,12 +69,12 @@
 
     /**
      * Creates a new {@link StreamResequencerConfig} instance using default
-     * values for <code>capacity</code> (100) and <code>timeout</code>
+     * values for <code>capacity</code> (1000) and <code>timeout</code>
      * (1000L). Elements of the sequence are compared using the
      * {@link DefaultExchangeComparator}.
      */
     public StreamResequencerConfig() {
-        this(100, 1000L);
+        this(1000, 1000L);
     }
 
     /**
@@ -106,7 +106,7 @@
 
     /**
      * Returns a new {@link StreamResequencerConfig} instance using default
-     * values for <code>capacity</code> (100) and <code>timeout</code>
+     * values for <code>capacity</code> (1000) and <code>timeout</code>
      * (1000L). Elements of the sequence are compared using the
      * {@link DefaultExchangeComparator}.
      * 

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=697732&r1=697731&r2=697732&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java Mon Sep 22 02:22:47 2008
@@ -16,14 +16,17 @@
  */
 package org.apache.camel.processor;
 
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.LoggingExceptionHandler;
+import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.processor.resequencer.ResequencerEngine;
 import org.apache.camel.processor.resequencer.SequenceElementComparator;
 import org.apache.camel.processor.resequencer.SequenceSender;
+import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.util.ServiceHelper;
 
 /**
  * A resequencer that re-orders a (continuous) stream of {@link Exchange}s. The
@@ -38,67 +41,172 @@
  * the sequence number 4. The message sequence 2,3,5 has a gap because the
  * sucessor of 3 is missing. The resequencer therefore has to retain message 5
  * until message 4 arrives (or a timeout occurs).
+ * <p>
+ * Instances of this class poll for {@link Exchange}s from a given
+ * <code>endpoint</code>. Resequencing work and the delivery of messages to
+ * the next <code>processor</code> is done within the single polling thread.
  * 
  * @author Martin Krasser
  * 
  * @version $Revision$
+ * 
+ * @see ResequencerEngine
  */
-public class StreamResequencer extends DelegateProcessor implements Processor {
+public class StreamResequencer extends ServiceSupport implements SequenceSender<Exchange>, Runnable, Processor {
 
-    private ResequencerEngine<Exchange> reseq;
-    private BlockingQueue<Exchange> queue;
-    private SequenceSender sender;
+    private ExceptionHandler exceptionHandler;
+    private ResequencerEngine<Exchange> engine;
+    private PollingConsumer<? extends Exchange> consumer;
+    private Endpoint<? extends Exchange> endpoint;
+    private Processor processor;
+    private Thread worker;
+    private int capacity;
     
     /**
      * Creates a new {@link StreamResequencer} instance.
      * 
+     * @param endpoint
+     *            endpoint to poll exchanges from.
      * @param processor
-     *            the next processor that processes the re-ordered exchanges.
+     *            next processor that processes re-ordered exchanges.
      * @param comparator
-     *            a {@link SequenceElementComparator} for comparing sequence
-     *            number contained in {@link Exchange}s.
-     * @param capacity
-     *            the capacity of the inbound queue.
-     */
-    public StreamResequencer(Processor processor, SequenceElementComparator<Exchange> comparator, int capacity) {
-        super(processor);
-        queue = new LinkedBlockingQueue<Exchange>();
-        reseq = new ResequencerEngine<Exchange>(comparator, capacity);
-        reseq.setOutQueue(queue);
+     *            a sequence element comparator for exchanges.
+     */
+    public StreamResequencer(Endpoint<? extends Exchange> endpoint, Processor processor, SequenceElementComparator<Exchange> comparator) {
+        this.exceptionHandler = new LoggingExceptionHandler(getClass());
+        this.engine = new ResequencerEngine<Exchange>(comparator);
+        this.engine.setSequenceSender(this);
+        this.endpoint = endpoint;
+        this.processor = processor;
     }
-    
-    @Override
-    protected void doStart() throws Exception {
-        super.doStart();
-        sender = new SequenceSender(getProcessor());
-        sender.setQueue(queue);
-        sender.start();
 
+    /**
+     * Returns this resequencer's exception handler.
+     * 
+     * @return this resequencer's exception handler.
+     */
+    public ExceptionHandler getExceptionHandler() {
+        return exceptionHandler;
     }
 
-    @Override
-    protected void doStop() throws Exception {
-        reseq.stop();
-        sender.cancel();
-        super.doStop();
+    /**
+     * Returns the next processor.
+     * 
+     * @return the next processor.
+     */
+    public Processor getProcessor() {
+        return processor;
     }
 
-    @Override
-    public void process(Exchange exchange) throws Exception {
-        reseq.put(exchange);
+    /**
+     * Returns this resequencer's capacity. The capacity is the maximum number
+     * of exchanges that can be managed by this resequencer at a given point in
+     * time. If the capacity if reached, polling from the endpoint will be
+     * skipped for <code>timeout</code> milliseconds giving exchanges the
+     * possibility to time out and to be delivered after the waiting period.
+     * 
+     * @return this resequencer's capacity.
+     */
+    public int getCapacity() {
+        return capacity;
     }
 
+    /**
+     * Returns this resequencer's timeout. This sets the resequencer engine's
+     * timeout via {@link ResequencerEngine#setTimeout(long)}. This value is
+     * also used to define the polling timeout from the endpoint.
+     * 
+     * @return this resequencer's timeout.
+     * (Processor) 
+     * @see ResequencerEngine#setTimeout(long)
+     */
     public long getTimeout() {
-        return reseq.getTimeout();
+        return engine.getTimeout();
+    }
+
+    public void setCapacity(int capacity) {
+        this.capacity = capacity;
     }
 
     public void setTimeout(long timeout) {
-        reseq.setTimeout(timeout);
+        engine.setTimeout(timeout);
     }
 
     @Override
     public String toString() {
-        return "StreamResequencer[to: " + getProcessor() + "]";
+        return "StreamResequencer[to: " + processor + "]";
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        consumer = endpoint.createPollingConsumer();
+        ServiceHelper.startServices(processor, consumer);
+        worker = new Thread(this, this + " Polling Thread");
+        engine.start();
+        worker.start();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.startServices(consumer, processor);
+        engine.stop();
+    }
+
+    /**
+     * Sends the <code>exchange</code> to the next <code>processor</code>.
+     * 
+     * @param o
+     *            exchange to send.
+     */
+    public void sendElement(Exchange o) throws Exception {
+        processor.process(o);
+    }
+
+    /**
+     * Loops over {@link #processExchange()}.
+     */
+    public void run() {
+        while (!isStopped() && !isStopping()) {
+            try {
+                processExchange();
+            } catch (Exception e) {
+                exceptionHandler.handleException(e);
+            }
+        }
+    }
+
+    /**
+     * Processes an exchange received from the this resequencer's
+     * <code>endpoint</code>. Received exchanges are processed via
+     * {@link ResequencerEngine#insert(Object)}.
+     * {@link ResequencerEngine#deliver()} is then called in any case regardless
+     * whether a message was received or receiving timed out.
+     * 
+     * @throws Exception
+     *             if exchange delivery fails.
+     */
+    protected void processExchange() throws Exception {
+        if (engine.size() >= capacity) {
+            Thread.sleep(getTimeout());
+        } else {
+            Exchange exchange = consumer.receive(getTimeout());
+            if (exchange != null) {
+                engine.insert(exchange);
+            }
+        }
+        engine.deliver();
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        if (engine.size() >= capacity) {
+            Thread.sleep(getTimeout());
+        } else {
+            if (exchange != null) {
+                engine.insert(exchange);
+            }
+        }
+        engine.deliver();
+        
     }
 
-}
+}
\ No newline at end of file

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Element.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Element.java?rev=697732&r1=697731&r2=697732&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Element.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Element.java Mon Sep 22 02:22:47 2008
@@ -22,8 +22,6 @@
  * for being released by the {@link ResequencerEngine}.
  * 
  * @author Martin Krasser
- * 
- * @version $Revision
  */
 class Element<E> implements TimeoutHandler {
 
@@ -69,14 +67,14 @@
     
     /**
      * Schedules the given timeout task. Before this methods calls the
-     * {@link Timeout#schedule()} method it adds this element as timeout
+     * {@link Timeout#schedule()} method it sets this element as timeout
      * listener.
      * 
      * @param t a timeout task.
      */
     public synchronized void schedule(Timeout t) {
         this.timeout = t;
-        this.timeout.addTimeoutHandlerFirst(this);
+        this.timeout.setTimeoutHandler(this);
         this.timeout.schedule();
     }
     

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ElementComparator.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ElementComparator.java?rev=697732&r1=697731&r2=697732&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ElementComparator.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ElementComparator.java Mon Sep 22 02:22:47 2008
@@ -23,7 +23,7 @@
  * 
  * @author Martin Krasser
  * 
- * @version $Revision
+ * @version $Revision$
  */
 class ElementComparator<E> implements SequenceElementComparator<Element<E>> {
 

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java?rev=697732&r1=697731&r2=697732&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java Mon Sep 22 02:22:47 2008
@@ -14,84 +14,81 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.processor.resequencer;
-
-import java.util.Queue;
-import java.util.Timer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
+  package org.apache.camel.processor.resequencer;
+  
+  import java.util.Timer;
+  
 /**
  * Resequences elements based on a given {@link SequenceElementComparator}.
- * This resequencer is designed for resequencing element streams. Resequenced
- * elements are added to an output {@link Queue}. The resequencer is configured
- * via the <code>timeout</code> and <code>capacity</code> properties.
- * 
- * <ul>
- * <li><code>timeout</code>. Defines the timeout (in milliseconds) for a
+ * This resequencer is designed for resequencing element streams. Stream-based
+ * resequencing has the advantage that the number of elements to be resequenced
+ * need not be known in advance. Resequenced elements are delivered via a
+ * {@link SequenceSender}.
+ * <p>
+ * The resequencer's behaviour for a given comparator is controlled by the
+ * <code>timeout</code> property. This is the timeout (in milliseconds) for a
  * given element managed by this resequencer. An out-of-sequence element can
  * only be marked as <i>ready-for-delivery</i> if it either times out or if it
  * has an immediate predecessor (in that case it is in-sequence). If an
  * immediate predecessor of a waiting element arrives the timeout task for the
  * waiting element will be cancelled (which marks it as <i>ready-for-delivery</i>).
  * <p>
- * If the maximum out-of-sequence time between elements within a stream is
- * known, the <code>timeout</code> value should be set to this value. In this
- * case it is guaranteed that all elements of a stream will be delivered in
- * sequence to the output queue. However, large <code>timeout</code> values
- * might require a very high resequencer <code>capacity</code> which might be
- * in conflict with available memory resources. The lower the
- * <code>timeout</code> value is compared to the out-of-sequence time between
- * elements within a stream the higher the probability is for out-of-sequence
- * elements delivered by this resequencer.</li>
- * <li><code>capacity</code>. The capacity of this resequencer.</li>
- * </ul>
- * 
- * Whenever a timeout for a certain element occurs or an element has been added
- * to this resequencer a delivery attempt is started. If a (sub)sequence of
- * elements is <i>ready-for-delivery</i> then they are added to output queue.
+ * If the maximum out-of-sequence time difference between elements within a
+ * stream is known, the <code>timeout</code> value should be set to this
+ * value. In this case it is guaranteed that all elements of a stream will be
+ * delivered in sequence via the {@link SequenceSender}. The lower the
+ * <code>timeout</code> value is compared to the out-of-sequence time
+ * difference between elements within a stream the higher the probability is for
+ * out-of-sequence elements delivered by this resequencer. Delivery of elements
+ * must be explicitly triggered by applications using the {@link #deliver()} or
+ * {@link #deliverNext()} methods. Only elements that are <i>ready-for-delivery</i>
+ * are delivered by these methods. The longer an application waits to trigger a
+ * delivery the more elements may become <i>ready-for-delivery</i>.
  * <p>
  * The resequencer remembers the last-delivered element. If an element arrives
- * which is the immediate successor of the last-delivered element it will be
- * delivered immediately and the last-delivered element is adjusted accordingly.
- * If the last-delivered element is <code>null</code> i.e. the resequencer was
- * newly created the first arriving element will wait <code>timeout</code>
- * milliseconds for being delivered to the output queue.
+ * which is the immediate successor of the last-delivered element it is
+ * <i>ready-for-delivery</i> immediately. After delivery the last-delivered
+ * element is adjusted accordingly. If the last-delivered element is
+ * <code>null</code> i.e. the resequencer was newly created the first arriving
+ * element needs <code>timeout</code> milliseconds in any case for becoming
+ * <i>ready-for-delivery</i>.
+ * <p>
+ * <strong>Note:</strong> Instances of this class are not thread-safe.
+ * Resequencing should be done by calling {@link #insert(Object)} and
+ * {@link #deliver()} or {@link #deliverNext()} from a single thread.
  * 
  * @author Martin Krasser
  * 
- * @version $Revision
+ * @version $Revision$
  */
-public class ResequencerEngine<E> implements TimeoutHandler {
-
-    private static final transient Log LOG = LogFactory.getLog(ResequencerEngine.class);
-    
-    private long timeout;    
-    private int capacity;    
-    private Queue<E> outQueue;    
+public class ResequencerEngine<E> {
+  
+    /**
+     * The element that most recently hash been delivered or <code>null</code>
+     * if no element has been delivered yet.
+     */
     private Element<E> lastDelivered;
-
+  
+    /**
+     * Minimum amount of time to wait for out-of-sequence elements.
+     */
+    private long timeout;
+     
     /**
      * A sequence of elements for sorting purposes.
      */
     private Sequence<Element<E>> sequence;
-    
+
     /**
      * A timer for scheduling timeout notifications.
      */
     private Timer timer;
     
     /**
-     * Creates a new resequencer instance with a default timeout of 2000
-     * milliseconds. The capacity is set to {@link Integer#MAX_VALUE}.
-     * 
-     * @param comparator a sequence element comparator.
+     * A strategy for sending sequence elements. 
      */
-    public ResequencerEngine(SequenceElementComparator<E> comparator) {
-        this(comparator, Integer.MAX_VALUE);
-    }
-
+    private SequenceSender<E> sequenceSender;
+     
     /**
      * Creates a new resequencer instance with a default timeout of 2000
      * milliseconds.
@@ -99,39 +96,32 @@
      * @param comparator a sequence element comparator.
      * @param capacity the capacity of this resequencer.
      */
-    public ResequencerEngine(SequenceElementComparator<E> comparator, int capacity) {
-        this.timer = new Timer("Resequencer Timer");
+    public ResequencerEngine(SequenceElementComparator<E> comparator) {
         this.sequence = createSequence(comparator);
-        this.capacity = capacity;
         this.timeout = 2000L;
         this.lastDelivered = null;
     }
     
+    public void start() {
+        timer = new Timer("Stream Resequencer Timer");
+    }
+    
     /**
      * Stops this resequencer (i.e. this resequencer's {@link Timer} instance).
      */
     public void stop() {
-        this.timer.cancel();
+        timer.cancel();
     }
     
     /**
-     * Returns the output queue.
-     * 
-     * @return the output queue.
-     */
-    public Queue<E> getOutQueue() {
-        return outQueue;
-    }
-
-    /**
-     * Sets the output queue.
+     * Returns the number of elements currently maintained by this resequencer.
      * 
-     * @param outQueue output queue.
+     * @return the number of elements currently maintained by this resequencer.
      */
-    public void setOutQueue(Queue<E> outQueue) {
-        this.outQueue = outQueue;
+    public int size() {
+        return sequence.size();
     }
-
+    
     /**
      * Returns this resequencer's timeout value.
      * 
@@ -150,50 +140,24 @@
         this.timeout = timeout;
     }
 
-    /** 
-     * Handles a timeout notification by starting a delivery attempt.
+    /**
+     * Returns the sequence sender.
      * 
-     * @param timout timeout task that caused the notification.
+     * @return the sequence sender.
      */
-    public synchronized void timeout(Timeout timout) {
-        try {
-            while (deliver()) {
-                // work done in deliver()
-            }
-        } catch (RuntimeException e) {
-            LOG.error("error during delivery", e);
-        }
+    public SequenceSender<E> getSequenceSender() {
+        return sequenceSender;
     }
 
     /**
-     * Adds an element to this resequencer throwing an exception if the maximum
-     * capacity is reached.
-     * 
-     * @param o element to be resequenced.
-     * @throws IllegalStateException if the element cannot be added at this time
-     *         due to capacity restrictions.
-     */
-    public synchronized void add(E o) {
-        if (sequence.size() >= capacity) {
-            throw new IllegalStateException("maximum capacity is reached");
-        }
-        insert(o);
-    }
-    
-    /**
-     * Adds an element to this resequencer waiting, if necessary, until capacity
-     * becomes available.
+     * Sets the sequence sender.
      * 
-     * @param o element to be resequenced.
-     * @throws InterruptedException if interrupted while waiting.
+     * @param sequenceSender a sequence element sender.
      */
-    public synchronized void put(E o) throws InterruptedException {
-        if (sequence.size() >= capacity) {
-            wait();
-        }
-        insert(o);
+    public void setSequenceSender(SequenceSender<E> sequenceSender) {
+        this.sequenceSender = sequenceSender;
     }
-    
+
     /**
      * Returns the last delivered element.
      * 
@@ -217,14 +181,13 @@
     }
     
     /**
-     * Inserts the given element into this resequencing queue (sequence). If the
-     * element is not ready for immediate delivery and has no immediate
-     * presecessor then it is scheduled for timing out. After being timed out it
-     * is ready for delivery.
+     * Inserts the given element into this resequencer. If the element is not
+     * ready for immediate delivery and has no immediate presecessor then it is
+     * scheduled for timing out. After being timed out it is ready for delivery.
      * 
      * @param o an element.
      */
-    private void insert(E o) {
+    public void insert(E o) {
         // wrap object into internal element
         Element<E> element = new Element<E>(o);
         // add element to sequence in proper order
@@ -244,25 +207,34 @@
         } else if (sequence.predecessor(element) != null) {
             // nothing to schedule
         } else {
-            Timeout t = defineTimeout();
-            element.schedule(t);
-        }
-        
-        // start delivery
-        while (deliver()) {
-            // work done in deliver()
+            element.schedule(defineTimeout());
         }
     }
     
     /**
+     * Delivers all elements which are currently ready to deliver.
+     * 
+     * @throws exception thrown by {@link SequenceSender#sendElement(Object)}.
+     * 
+     * @see #deliverNext();
+     */
+    public void deliver() throws Exception {
+        while(deliverNext());
+    }
+    
+    /**
      * Attempts to deliver a single element from the head of the resequencer
      * queue (sequence). Only elements which have not been scheduled for timing
-     * out or which already timed out can be delivered.
+     * out or which already timed out can be delivered. Elements are deliveref via 
+     * {@link SequenceSender#sendElement(Object)}.
      * 
      * @return <code>true</code> if the element has been delivered
      *         <code>false</code> otherwise.
+     * 
+     * @throws exception thrown by {@link SequenceSender#sendElement(Object)}.
+     *         
      */
-    private boolean deliver() {
+    public boolean deliverNext() throws Exception {   
         if (sequence.size() == 0) {
             return false;
         }
@@ -280,11 +252,8 @@
         // set the delivered element to last delivered element
         lastDelivered = element;
         
-        // notify a waiting thread that capacity is available
-        notify();
-        
-        // add element to output queue
-        outQueue.add(element.getObject());
+        // deliver the sequence element
+        sequenceSender.sendElement(element.getObject());
 
         // element has been delivered
         return true;
@@ -314,9 +283,7 @@
      * @return a new timeout task.
      */
     private Timeout defineTimeout() {
-        Timeout result = new Timeout(timer, timeout);
-        result.addTimeoutHandler(this);
-        return result;
+        return new Timeout(timer, timeout);
     }
     
     private static <E> Sequence<Element<E>> createSequence(SequenceElementComparator<E> comparator) {

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Sequence.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Sequence.java?rev=697732&r1=697731&r2=697732&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Sequence.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Sequence.java Mon Sep 22 02:22:47 2008
@@ -26,7 +26,7 @@
  * 
  * @author Martin Krasser
  * 
- * @version $Revision
+ * @version $Revision$
  */
 public class Sequence<E> extends TreeSet<E> {
 

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java?rev=697732&r1=697731&r2=697732&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java Mon Sep 22 02:22:47 2008
@@ -23,7 +23,7 @@
  * 
  * @author Martin Krasser
  * 
- * @version $Revision
+ * @version $Revision$
  */
 public interface SequenceElementComparator<E> extends Comparator<E> {
 

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceSender.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceSender.java?rev=697732&r1=697731&r2=697732&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceSender.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceSender.java Mon Sep 22 02:22:47 2008
@@ -16,86 +16,23 @@
  */
 package org.apache.camel.processor.resequencer;
 
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.concurrent.BlockingQueue;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 /**
- * A thread that takes re-ordered {@link Exchange}s from a blocking queue and
- * send them to the linked processor.  
- * 
- * @author Martin Krasser
- * 
- * @version $Revision$
- */
-public class SequenceSender extends Thread {
-
-    private static final transient Log LOG = LogFactory.getLog(SequenceSender.class);
-    private static final Exchange STOP = createStopSignal();
-    
-    private BlockingQueue<Exchange> queue;
-    private Processor processor;
+  * An interface used by the {@link ResequencerEngine#deliver()} and
+  * {@link ResequencerEngine#deliverNext()} methods to send out re-ordered
+  * elements.
+  * 
+  * @author Martin Krasser
+  * 
+  * @version $Revision$
+  */
+public interface SequenceSender<E> {
     
     /**
-     * Creates a new {@link SequenceSender} thread.
+     * Sends the given element.
      * 
-     * @param processor the processor to send re-ordered {@link Exchange}s.
+     * @param o a re-ordered element.
+     * @throws Exception if delivery fails.
      */
-    public SequenceSender(Processor processor) {
-        this.processor = processor;
-    }
-    
-    /**
-     * Sets the {@link BlockingQueue} to take messages from.
-     * 
-     * @param queue the {@link BlockingQueue} to take messages from.
-     */
-    public void setQueue(BlockingQueue<Exchange> queue) {
-        this.queue = queue;
-    }
-
-    public void run() {
-        while (true) {
-            try {
-                Exchange exchange = queue.take();
-                if (exchange == STOP) {
-                    LOG.info("Exit processing loop after cancellation");
-                    return;
-                }
-                processor.process(exchange);
-            } catch (InterruptedException e) {
-                LOG.info("Exit processing loop after interrupt");
-                return;
-            } catch (Exception e) {
-                LOG.warn("Exception during exchange processing: " + e.getMessage());
-            }
-        }
-    }
-    
-    /**
-     * Cancels this thread.
-     */
-    public void cancel() throws InterruptedException {
-        queue.put(STOP);
-    }
-    
-    private static Exchange createStopSignal() {
-        return (Exchange)Proxy.newProxyInstance(SequenceSender.class.getClassLoader(), 
-                new Class[] {Exchange.class}, createStopHandler());
-    }
-    
-    private static InvocationHandler createStopHandler() {
-        return new InvocationHandler() {
-            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
-                throw new RuntimeException("Illegal method invocation on stop signal");
-            }
-        };
-    }
+    public void sendElement(E o) throws Exception;
     
-}
+}
\ No newline at end of file

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Timeout.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Timeout.java?rev=697732&r1=697731&r2=697732&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Timeout.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/Timeout.java Mon Sep 22 02:22:47 2008
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.processor.resequencer;
 
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
 
@@ -29,64 +27,48 @@
  * 
  * @author Martin Krasser
  * 
- * @version $Revision
+ * @version $Revision$
  */
 public class Timeout extends TimerTask {
     
-    private List<TimeoutHandler> timeoutHandlers;
+    private TimeoutHandler timeoutHandler;
     
     private Timer timer;
     
     private long timeout;
     
     /**
-     * Creates a new timeout task using the given {@link Timer} instance a timeout value. The
-     * task is not scheduled immediately. It will be scheduled by calling this
-     * task's {@link #schedule()} method.
+     * Creates a new timeout task using the given {@link Timer} instance and
+     * timeout value. The task is not scheduled immediately. It will be
+     * scheduled by calling this task's {@link #schedule()} method.
      * 
      * @param timer
+     *            a timer
      * @param timeout
+     *            a timeout value.
      */
     public Timeout(Timer timer, long timeout) {
-        this.timeoutHandlers = new LinkedList<TimeoutHandler>();
         this.timeout = timeout;
         this.timer = timer;
     }
 
     /**
-     * Returns the list of timeout handlers that have been registered for
-     * notification.
+     * Returns the timeout handler that has been registered for notification.
      * 
-     * @return the list of timeout handlers
+     * @return the timeout handler.
      */
-    public List<TimeoutHandler> getTimeoutHandlers() {
-        return timeoutHandlers;
+    public TimeoutHandler getTimeoutHandlers() {
+        return timeoutHandler;
     }
     
     /**
-     * Appends a new timeout handler at the end of the timeout handler list.
+     * Sets a timeout handler for receiving timeout notifications.
      * 
-     * @param handler a timeout handler.
+     * @param timeoutHandler
+     *            a timeout handler.
      */
-    public void addTimeoutHandler(TimeoutHandler handler) {
-        timeoutHandlers.add(handler);
-    }
-    
-    /**
-     * inserts a new timeout handler at the beginning of the timeout handler
-     * list.
-     * 
-     * @param handler a timeout handler.
-     */
-    public void addTimeoutHandlerFirst(TimeoutHandler handler) {
-        timeoutHandlers.add(0, handler);
-    }
-    
-    /**
-     * Removes all timeout handlers from the timeout handler list. 
-     */
-    public void clearTimeoutHandlers() {
-        this.timeoutHandlers.clear();
+    public void setTimeoutHandler(TimeoutHandler timeoutHandler) {
+        this.timeoutHandler = timeoutHandler;
     }
     
     /**
@@ -97,13 +79,11 @@
     }
 
     /**
-     * Notifies all timeout handlers about the scheduled timeout.
+     * Notifies the timeout handler about the scheduled timeout.
      */
     @Override
     public void run() {
-        for (TimeoutHandler observer : timeoutHandlers) {
-            observer.timeout(this);
-        }
+        timeoutHandler.timeout(this);
     }
 
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/TimeoutHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/TimeoutHandler.java?rev=697732&r1=697731&r2=697732&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/TimeoutHandler.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/TimeoutHandler.java Mon Sep 22 02:22:47 2008
@@ -21,7 +21,7 @@
  * 
  * @author Martin Krasser
  * 
- * @version $Revision
+ * @version $Revision$
  */
 public interface TimeoutHandler {
 

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/model/XmlParseTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/model/XmlParseTest.java?rev=697732&r1=697731&r2=697732&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/model/XmlParseTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/model/XmlParseTest.java Mon Sep 22 02:22:47 2008
@@ -228,7 +228,7 @@
         ResequencerType resequencer = assertResequencer(route);
         assertNotNull(resequencer.getStreamConfig());
         assertNull(resequencer.getBatchConfig());
-        assertEquals(100, resequencer.getStreamConfig().getCapacity());
+        assertEquals(1000, resequencer.getStreamConfig().getCapacity());
         assertEquals(2000L, resequencer.getStreamConfig().getTimeout());
     }
 

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java?rev=697732&r1=697731&r2=697732&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java Mon Sep 22 02:22:47 2008
@@ -71,7 +71,7 @@
         assertEquals("Number of routes created: " + list, 1, list.size());
 
         Route route = list.get(0);
-        assertTrue(route.toString().startsWith("BatchResequencerRoute"));
+        assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
     }
 
     public void testBatchResequencerTypeWithoutJmx() throws Exception {

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java?rev=697732&r1=697731&r2=697732&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java Mon Sep 22 02:22:47 2008
@@ -81,6 +81,7 @@
     }
 
     public void testStreamResequencerTypeWithoutJmx() throws Exception {
+        System.out.println("This will now fail");
         System.setProperty(JmxSystemPropertyKeys.DISABLED, "true");
         doTestStreamResequencerType();
     }

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/IntegerComparator.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/IntegerComparator.java?rev=697732&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/IntegerComparator.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/IntegerComparator.java Mon Sep 22 02:22:47 2008
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.resequencer;
+
+public class IntegerComparator implements SequenceElementComparator<Integer> {
+
+    public boolean predecessor(Integer o1, Integer o2) {
+        return o1.intValue() == (o2.intValue() - 1);
+    }
+
+    public boolean successor(Integer o1, Integer o2) {
+        return o2.intValue() == (o1.intValue() - 1);
+    }
+
+    public int compare(Integer o1, Integer o2) {
+        return o1.compareTo(o2);
+    }
+
+}

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerEngineSync.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerEngineSync.java?rev=697732&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerEngineSync.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerEngineSync.java Mon Sep 22 02:22:47 2008
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.resequencer;
+
+/**
+ * Synchronization facade for {@link ResequencerEngine} for testing purposes
+ * only. This facade is used for both exclusion purposes and for visibility of
+ * changes performed by different threads in unit tests. This facade is <i>not</i>
+ * needed in {@link ResequencerEngine} applications because it is expected that
+ * resequencing is performed by a single thread.
+ * 
+ * @author Martin Krasser
+ */
+public class ResequencerEngineSync<E> {
+
+    private ResequencerEngine<E> resequencer;
+    
+    public ResequencerEngineSync(ResequencerEngine<E> resequencer) {
+        this.resequencer = resequencer;
+    }
+    
+    public synchronized void stop() {
+        resequencer.stop();
+    }
+    
+    public synchronized int size() {
+        return resequencer.size();
+    }
+    
+    public synchronized long getTimeout() {
+        return resequencer.getTimeout();
+    }
+
+    public synchronized void setTimeout(long timeout) {
+        resequencer.setTimeout(timeout);
+    }
+
+    public synchronized SequenceSender<E> getSequenceSender() {
+        return resequencer.getSequenceSender();
+    }
+
+    public synchronized void setSequenceSender(SequenceSender<E> sequenceSender) {
+        resequencer.setSequenceSender(sequenceSender);
+    }
+
+    synchronized E getLastDelivered() {
+        return resequencer.getLastDelivered();
+    }
+    
+    synchronized void setLastDelivered(E o) {
+        resequencer.setLastDelivered(o);
+    }
+    
+    public synchronized void insert(E o) {
+        resequencer.insert(o);
+    }
+    
+    public synchronized void deliver() throws Exception {
+        resequencer.deliver();
+    }
+    
+    public synchronized boolean deliverNext() throws Exception {
+        return resequencer.deliverNext();
+    }
+}

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerEngineTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerEngineTest.java?rev=697732&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerEngineTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerEngineTest.java Mon Sep 22 02:22:47 2008
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.resequencer;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+public class ResequencerEngineTest extends TestCase {
+
+    private static final boolean IGNORE_LOAD_TESTS = true;
+    
+    private ResequencerEngineSync<Integer> resequencer;
+    
+    private ResequencerRunner<Integer> runner;
+    
+    private SequenceBuffer<Integer> buffer;
+    
+    public void setUp() throws Exception {
+    }
+
+    public void tearDown() throws Exception {
+        if (runner != null) {
+            runner.cancel();
+        }
+        if (resequencer != null) {
+            resequencer.stop();
+        }
+    }
+
+    public void testTimeout1() throws Exception {
+        initResequencer(500, 10);
+        resequencer.insert(4);
+        assertNull(buffer.poll(250));
+        assertEquals((Integer)4, buffer.take());
+        assertEquals((Integer)4, resequencer.getLastDelivered());
+    }
+    
+    public void testTimeout2() throws Exception {
+        initResequencer(500, 10);
+        resequencer.setLastDelivered(2);
+        resequencer.insert(4);
+        assertNull(buffer.poll(250));
+        assertEquals((Integer)4, buffer.take());
+        assertEquals((Integer)4, resequencer.getLastDelivered());
+    }
+    
+    public void testTimeout3() throws Exception {
+        initResequencer(500, 10);
+        resequencer.setLastDelivered(3);
+        resequencer.insert(4);
+        assertEquals((Integer)4, buffer.poll(250));
+        assertEquals((Integer)4, resequencer.getLastDelivered());
+    }
+    
+    public void testTimout4() throws Exception {
+        initResequencer(500, 10);
+        resequencer.setLastDelivered(2);
+        resequencer.insert(4);
+        resequencer.insert(3);
+        assertEquals((Integer)3, buffer.poll(125));
+        assertEquals((Integer)4, buffer.poll(125));
+        assertEquals((Integer)4, resequencer.getLastDelivered());
+    }
+    
+    public void testRandom() throws Exception {
+        if (IGNORE_LOAD_TESTS) {
+            return;
+        }
+        int input = 1000;
+        initResequencer(1000, 1000);
+        List<Integer> list = new LinkedList<Integer>();
+        for (int i = 0; i < input; i++) {
+            list.add(i);
+        }
+        Random random = new Random(System.currentTimeMillis());
+        System.out.println("Input sequence:");
+        long millis = System.currentTimeMillis();
+        for (int i = input; i > 0; i--) {
+            int r = random.nextInt(i);
+            int next = list.remove(r);
+            System.out.print(next + " ");
+            resequencer.insert(next); 
+        }
+        System.out.println("\nOutput sequence:");
+        for (int i = 0; i < input; i++) {
+            System.out.print(buffer.take() + " ");
+        }
+        millis = System.currentTimeMillis() - millis;
+        System.out.println("\nDuration = " + millis + " ms");
+    }
+    
+    public void testReverse1() throws Exception {
+        if (IGNORE_LOAD_TESTS) {
+            return;
+        }
+        testReverse(10);
+    }
+    
+    public void testReverse2() throws Exception {
+        if (IGNORE_LOAD_TESTS) {
+            return;
+        }
+        testReverse(100);
+    }
+    
+    private void testReverse(int capacity) throws Exception {
+        initResequencer(1, capacity);
+        for (int i = 99; i >= 0; i--) {
+            resequencer.insert(i);
+        }
+        System.out.println("\nOutput sequence:");
+        for (int i = 0; i < 100; i++) {
+            System.out.print(buffer.take() + " ");
+        }
+    }
+    
+    private void initResequencer(long timeout, int capacity) {
+        ResequencerEngine<Integer> engine;
+        buffer = new SequenceBuffer<Integer>();
+        engine = new ResequencerEngine<Integer>(new IntegerComparator());
+        engine.setSequenceSender(buffer);
+        engine.setTimeout(timeout);
+        engine.start();
+        resequencer = new ResequencerEngineSync<Integer>(engine);
+        runner = new ResequencerRunner<Integer>(resequencer, 50);
+        runner.start();
+    }
+    
+}

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerRunner.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerRunner.java?rev=697732&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerRunner.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/ResequencerRunner.java Mon Sep 22 02:22:47 2008
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.resequencer;
+
+public class ResequencerRunner<E> extends Thread {
+
+    private ResequencerEngineSync<E> resequencer;
+    
+    private long interval;
+    
+    private boolean cancelRequested;
+    
+    public ResequencerRunner(ResequencerEngineSync<E> resequencer, long interval) {
+        this.resequencer = resequencer;
+        this.interval = interval;
+        this.cancelRequested = false;
+    }
+    
+    @Override
+    public void run() {
+        while(!cancelRequested()) {
+            try {
+                Thread.sleep(interval);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            try {
+                resequencer.deliver();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+        // TODO Auto-generated method stub
+        super.run();
+    }
+
+    public synchronized void cancel() {
+        this.cancelRequested = true;
+    }
+    
+    private synchronized boolean cancelRequested() {
+        return cancelRequested;
+    }
+    
+}

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/SequenceBuffer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/SequenceBuffer.java?rev=697732&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/SequenceBuffer.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/SequenceBuffer.java Mon Sep 22 02:22:47 2008
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.resequencer;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class SequenceBuffer<E> implements SequenceSender<E> {
+
+    private BlockingQueue<E> queue;
+    
+    public SequenceBuffer() {
+        this.queue = new LinkedBlockingQueue<E>();
+    }
+    
+    public int size() {
+        return queue.size();
+    }
+    
+    public E take() throws InterruptedException {
+        return queue.take();
+    }
+    
+    public E poll(long timeout) throws InterruptedException {
+        return queue.poll(timeout, TimeUnit.MILLISECONDS);
+    }
+    
+    public void sendElement(E o) throws Exception {
+        queue.put(o);
+    }
+
+}

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/SequenceTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/SequenceTest.java?rev=697732&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/SequenceTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/SequenceTest.java Mon Sep 22 02:22:47 2008
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.resequencer;
+
+import junit.framework.TestCase;
+
+public class SequenceTest extends TestCase {
+
+    private TestObject e1;
+    private TestObject e2;
+    private TestObject e3;
+    
+    private Sequence<TestObject> set;
+
+    public void setUp() throws Exception {
+        e1 = new TestObject(3);
+        e2 = new TestObject(4);
+        e3 = new TestObject(7);
+        set = new Sequence<TestObject>(new TestComparator());
+        set.add(e3);
+        set.add(e1);
+        set.add(e2);
+    }
+
+    public void tearDown() throws Exception {
+    }
+
+    public void testPredecessor() {
+        assertEquals(e1, set.predecessor(e2));
+        assertEquals(null, set.predecessor(e1));
+        assertEquals(null, set.predecessor(e3));
+    }
+
+    public void testSuccessor() {
+        assertEquals(e2, set.successor(e1));
+        assertEquals(null, set.successor(e2));
+        assertEquals(null, set.successor(e3));
+    }
+
+}

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestComparator.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestComparator.java?rev=697732&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestComparator.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestComparator.java Mon Sep 22 02:22:47 2008
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.resequencer;
+
+public class TestComparator implements SequenceElementComparator<TestObject> {
+
+    public boolean predecessor(TestObject o1, TestObject o2) {
+        return (o2.getValue() - 1) == o1.getValue();
+    }
+
+    public boolean successor(TestObject o1, TestObject o2) {
+        return (o1.getValue() - 1) == o2.getValue();
+    }
+
+    public int compare(TestObject o1, TestObject o2) {
+        return Integer.valueOf(o1.getValue()).compareTo(o2.getValue());
+    }
+
+}

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestComparatorTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestComparatorTest.java?rev=697732&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestComparatorTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestComparatorTest.java Mon Sep 22 02:22:47 2008
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.resequencer;
+
+import junit.framework.TestCase;
+
+public class TestComparatorTest extends TestCase {
+
+    private TestComparator c;
+    
+    private TestObject e1;
+    private TestObject e2;
+    private TestObject e3;
+    
+    public void setUp() {
+        c = new TestComparator();
+        e1 = new TestObject(3);
+        e2 = new TestObject(4);
+        e3 = new TestObject(7);
+    }
+    
+    public void tearDown() throws Exception {
+    }
+
+    public void testPredecessor() {
+        assertTrue(c.predecessor(e1, e2));
+        assertFalse(c.predecessor(e2, e1));
+        assertFalse(c.predecessor(e1, e3));
+        assertFalse(c.predecessor(e3, e1));
+        assertFalse(c.predecessor(e3, e3));
+    }
+
+    public void testSuccessor() {
+        assertTrue(c.successor(e2, e1));
+        assertFalse(c.successor(e1, e2));
+        assertFalse(c.successor(e3, e1));
+        assertFalse(c.successor(e1, e3));
+        assertFalse(c.successor(e3, e3));
+    }
+
+    public void testCompare() {
+        assertTrue(c.compare(e1, e2) < 0);
+        assertTrue(c.compare(e2, e1) > 0);
+        assertTrue(c.compare(e1, e3) < 0);
+        assertTrue(c.compare(e3, e1) > 0);
+        assertTrue(c.compare(e3, e3) == 0);
+    }
+    
+}

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestObject.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestObject.java?rev=697732&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestObject.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/resequencer/TestObject.java Mon Sep 22 02:22:47 2008
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.resequencer;
+
+public class TestObject {
+
+    private int value;
+    
+    public TestObject(int value) {
+        this.value = value;
+    }
+    
+    public int getValue() {
+        return value;
+    }
+
+    @Override
+    public String toString() {
+        return Integer.toString(value);
+    }
+    
+}



Mime
View raw message