camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject svn commit: r724619 - in /activemq/camel/trunk: camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/test/java/org/apache/camel/processor/ components/camel-jms/src/test/java/org/apache/cam...
Date Tue, 09 Dec 2008 06:29:43 GMT
Author: wtam
Date: Mon Dec  8 22:29:43 2008
New Revision: 724619

URL: http://svn.apache.org/viewvc?rev=724619&view=rev
Log:
Applied patch from Martin Krasser with thanks! [CAMEL-1037] Messages in Resequencer between
2 JMS queues get stuck

Added:
    activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTest.java
  (with props)
    activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java
  (with props)
    activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/issues/JmsResequencerTest-context.xml
  (with props)
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java?rev=724619&r1=724618&r2=724619&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
Mon Dec  8 22:29:43 2008
@@ -17,7 +17,6 @@
 package org.apache.camel.model;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 
 import javax.xml.bind.annotation.XmlAccessType;
@@ -28,12 +27,9 @@
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
-import org.apache.camel.Route;
 import org.apache.camel.builder.ExpressionClause;
 import org.apache.camel.model.language.ExpressionType;
 import org.apache.camel.processor.Aggregator;
@@ -107,34 +103,10 @@
     public String getShortName() {
         return "aggregator";
     }
-
     
-    @SuppressWarnings("unchecked")
-    @Override
-    public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws
Exception {
-        final Aggregator aggregator = createAggregator(routeContext);
-        doAddRoute(routeContext, routes, aggregator);
-    }
-    
-    private void doAddRoute(RouteContext routeContext, Collection<Route> routes, final
Aggregator aggregator)
-        throws Exception {
-        Route route = new Route<Exchange>(aggregator.getEndpoint(), aggregator) {
-            @Override
-            public String toString() {
-                return "AggregatorRoute[" + getEndpoint() + " -> " + aggregator.getProcessor()
+ "]";
-            }
-        };
-
-        routes.add(route);
-    }
- 
     @Override
     public Processor createProcessor(RouteContext routeContext) throws Exception {
-        final Aggregator aggregator = createAggregator(routeContext);
-        
-        doAddRoute(routeContext, routeContext.getCamelContext().getRoutes(), aggregator);
-        routeContext.setIsRouteAdded(true);
-        return aggregator;
+        return createAggregator(routeContext);
     }
 
     public ExpressionClause<AggregatorType> createAndSetExpression() {
@@ -144,7 +116,6 @@
     }
     
     protected Aggregator createAggregator(RouteContext routeContext) throws Exception {
-        Endpoint from = routeContext.getEndpoint();
         final Processor processor = routeContext.createProcessor(this);
 
         final Aggregator aggregator;
@@ -163,7 +134,7 @@
                 AggregationStrategy strategy = createAggregationStrategy(routeContext);
                 aggregationCollection.setAggregationStrategy(strategy);
             }
-            aggregator = new Aggregator(from, processor, aggregationCollection);
+            aggregator = new Aggregator(processor, aggregationCollection);
         } else {
             // create the aggregator using a default collection
             AggregationStrategy strategy = createAggregationStrategy(routeContext);
@@ -180,9 +151,9 @@
                 predicate = getCompletedPredicate().createPredicate(routeContext);
             }
             if (predicate != null) {
-                aggregator = new Aggregator(from, processor, aggregateExpression, strategy,
predicate);
+                aggregator = new Aggregator(processor, aggregateExpression, strategy, predicate);
             } else {
-                aggregator = new Aggregator(from, processor, aggregateExpression, strategy);
+                aggregator = new Aggregator(processor, aggregateExpression, strategy);
             }
         }
         

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java?rev=724619&r1=724618&r2=724619&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
Mon Dec  8 22:29:43 2008
@@ -25,6 +25,7 @@
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Executor;
+
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
@@ -59,7 +60,6 @@
 import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.Policy;
 import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.CollectionHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -780,9 +780,6 @@
      * @return the expression clause to be used as builder to configure the correlation expression
      */
     public ExpressionClause<AggregatorType> aggregate() {
-        if (CollectionHelper.filterList(getOutputs(), ExceptionType.class).isEmpty()) {
-            LOG.warn("Aggregator must be the only output added to the route: " + this);
-        }
         AggregatorType answer = new AggregatorType();
         addOutput(answer);
         return answer.createAndSetExpression();
@@ -796,9 +793,6 @@
      * @return the expression clause to be used as builder to configure the correlation expression
      */
     public ExpressionClause<AggregatorType> aggregate(AggregationStrategy aggregationStrategy)
{
-        if (CollectionHelper.filterList(getOutputs(), ExceptionType.class).isEmpty()) {
-            LOG.warn("Aggregator must be the only output added to the route: " + this);
-        }
         AggregatorType answer = new AggregatorType();
         answer.setAggregationStrategy(aggregationStrategy);
         addOutput(answer);
@@ -813,9 +807,6 @@
      * @return the builder
      */
     public AggregatorType aggregate(AggregationCollection aggregationCollection) {
-        if (CollectionHelper.filterList(getOutputs(), ExceptionType.class).isEmpty()) {
-            LOG.warn("Aggregator must be the only output added to the route: " + this);
-        }
         AggregatorType answer = new AggregatorType();
         answer.setAggregationCollection(aggregationCollection);
         addOutput(answer);
@@ -833,9 +824,6 @@
      * @return the builder
      */
     public AggregatorType aggregate(Expression correlationExpression) {
-        if (CollectionHelper.filterList(getOutputs(), ExceptionType.class).isEmpty()) {
-            LOG.warn("Aggregator must be the only output added to the route: " + this);
-        }
         AggregatorType answer = new AggregatorType(correlationExpression);
         addOutput(answer);
         return answer;
@@ -853,9 +841,6 @@
      * @return the builder
      */
     public AggregatorType aggregate(Expression correlationExpression, AggregationStrategy
aggregationStrategy) {
-        if (CollectionHelper.filterList(getOutputs(), ExceptionType.class).isEmpty()) {
-            LOG.warn("Aggregator must be the only output added to the route: " + this);
-        }
         AggregatorType answer = new AggregatorType(correlationExpression, aggregationStrategy);
         addOutput(answer);
         return answer;

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java?rev=724619&r1=724618&r2=724619&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
Mon Dec  8 22:29:43 2008
@@ -24,10 +24,8 @@
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
-import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
-import org.apache.camel.Route;
 import org.apache.camel.model.config.BatchResequencerConfig;
 import org.apache.camel.model.config.StreamResequencerConfig;
 import org.apache.camel.model.language.ExpressionType;
@@ -251,8 +249,7 @@
     protected Resequencer createBatchResequencer(RouteContext routeContext,
             BatchResequencerConfig config) throws Exception {
         Processor processor = routeContext.createProcessor(this);
-        Resequencer resequencer = new Resequencer(routeContext.getEndpoint(),
-                processor, resolveExpressionList(routeContext));
+        Resequencer resequencer = new Resequencer(processor, resolveExpressionList(routeContext));
         resequencer.setBatchSize(config.getBatchSize());
         resequencer.setBatchTimeout(config.getBatchTimeout());
         return resequencer;
@@ -273,36 +270,13 @@
             StreamResequencerConfig config) throws Exception {
         config.getComparator().setExpressions(resolveExpressionList(routeContext));
         Processor processor = routeContext.createProcessor(this);
-        StreamResequencer resequencer = new StreamResequencer(routeContext.getEndpoint(),
-                processor, config.getComparator());
+        StreamResequencer resequencer = new StreamResequencer(processor, config.getComparator());
         resequencer.setTimeout(config.getTimeout());
         resequencer.setCapacity(config.getCapacity());
         return resequencer;
         
     }
 
-    private Route<? extends Exchange> createBatchResequencerRoute(RouteContext routeContext)
throws Exception {
-        // TODO: Not used should it be removed?
-        final Resequencer resequencer = createBatchResequencer(routeContext, batchConfig);
-        return new Route(routeContext.getEndpoint(), resequencer) {
-            @Override
-            public String toString() {
-                return "BatchResequencerRoute[" + getEndpoint() + " -> " + resequencer.getProcessor()
+ "]";
-            }
-        };
-    }
-    
-    private Route<? extends Exchange> createStreamResequencerRoute(RouteContext routeContext)
throws Exception {
-        // TODO: Not used should it be removed?
-        final StreamResequencer resequencer = createStreamResequencer(routeContext, streamConfig);
-        return new Route(routeContext.getEndpoint(), resequencer) {
-            @Override
-            public String toString() {
-                return "StreamResequencerRoute[" + getEndpoint() + " -> " + resequencer.getProcessor()
+ "]";
-            }
-        };
-    }
-    
     private List<Expression> resolveExpressionList(RouteContext routeContext) {
         if (expressionList == null) {
             expressionList = new ArrayList<Expression>();

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java?rev=724619&r1=724618&r2=724619&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
Mon Dec  8 22:29:43 2008
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.processor;
 
-import org.apache.camel.Endpoint;
 import org.apache.camel.Expression;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
@@ -45,19 +44,19 @@
 public class Aggregator extends BatchProcessor {
     private Predicate aggregationCompletedPredicate;
 
-    public Aggregator(Endpoint endpoint, Processor processor, Expression correlationExpression,
+    public Aggregator(Processor processor, Expression correlationExpression,
                       AggregationStrategy aggregationStrategy) {
-        this(endpoint, processor, new DefaultAggregationCollection(correlationExpression,
aggregationStrategy));
+        this(processor, new DefaultAggregationCollection(correlationExpression, aggregationStrategy));
     }
 
-    public Aggregator(Endpoint endpoint, Processor processor, Expression correlationExpression,
+    public Aggregator(Processor processor, Expression correlationExpression,
                       AggregationStrategy aggregationStrategy, Predicate aggregationCompletedPredicate)
{
-        this(endpoint, processor, new PredicateAggregationCollection(correlationExpression,
aggregationStrategy, aggregationCompletedPredicate));
+        this(processor, new PredicateAggregationCollection(correlationExpression, aggregationStrategy,
aggregationCompletedPredicate));
         this.aggregationCompletedPredicate = aggregationCompletedPredicate;
     }
 
-    public Aggregator(Endpoint endpoint, Processor processor, AggregationCollection collection)
{
-        super(endpoint, processor, collection);
+    public Aggregator(Processor processor, AggregationCollection collection) {
+        super(processor, collection);
     }
 
     @Override
@@ -68,7 +67,7 @@
     @Override
     protected boolean isBatchCompleted(int index) {
         if (aggregationCompletedPredicate != null) {
-            // TODO: (davsclaus) What is the point with this code? I think its wrong
+            // TODO: (davsclaus) CAMEL-1159 What is the point with this code? I think its
wrong
             if (getCollection().size() > 0) {
                 return true;
             }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=724619&r1=724618&r2=724619&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
Mon Dec  8 22:29:43 2008
@@ -18,17 +18,14 @@
 
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.util.ServiceHelper;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 /**
  * A base class for any kind of {@link Processor} which implements some kind of
@@ -36,24 +33,25 @@
  * 
  * @version $Revision$
  */
-public class BatchProcessor extends ServiceSupport implements Runnable, Processor {
+public class BatchProcessor extends ServiceSupport implements Processor {
+    
     public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
     public static final int DEFAULT_BATCH_SIZE = 100;
 
-    private static final transient Log LOG = LogFactory.getLog(BatchProcessor.class);
-    private Endpoint endpoint;
-    private Processor processor;
-    private Collection<Exchange> collection;
     private long batchTimeout = DEFAULT_BATCH_TIMEOUT;
     private int batchSize = DEFAULT_BATCH_SIZE;
     private int outBatchSize;
-    private PollingConsumer consumer;
+
+    private Processor processor;
+    private Collection<Exchange> collection;
     private ExceptionHandler exceptionHandler;
 
-    public BatchProcessor(Endpoint endpoint, Processor processor, Collection<Exchange>
collection) {
-        this.endpoint = endpoint;
+    private BatchSender sender;
+    
+    public BatchProcessor(Processor processor, Collection<Exchange> collection) {
         this.processor = processor;
         this.collection = collection;
+        this.sender = new BatchSender();
     }
 
     @Override
@@ -61,18 +59,6 @@
         return "BatchProcessor[to: " + processor + "]";
     }
 
-    public void run() {
-        LOG.debug("Starting thread for " + this);
-        while (isRunAllowed()) {
-            try {
-                processBatch();
-            } catch (Exception e) {
-                getExceptionHandler().handleException(e);
-            }
-        }
-        collection.clear();
-    }
-
     // Properties
     // -------------------------------------------------------------------------
     public ExceptionHandler getExceptionHandler() {
@@ -123,62 +109,20 @@
         this.batchTimeout = batchTimeout;
     }
 
-    public Endpoint getEndpoint() {
-        return endpoint;
-    }
-
     public Processor getProcessor() {
         return processor;
     }
 
     /**
-     * A transactional method to process a batch of messages up to a timeout
-     * period or number of messages reached.
-     */
-    protected synchronized void processBatch() throws Exception {
-        long start = System.currentTimeMillis();
-        long end = start + batchTimeout;
-        for (int i = 0; !isBatchCompleted(i); i++) {
-            long timeout = end - System.currentTimeMillis();
-            if (timeout < 0L) {                
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("batch timeout expired at batch index: " + i);
-                }
-                break;
-            }
-            Exchange exchange = consumer.receive(timeout);
-            if (exchange == null) {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("receive with timeout: " + timeout + " expired at batch index:
" + i);
-                }
-                break;
-            }
-            collection.add(exchange);
-        }
-
-        // we should NOT log the collection directly as it will invoke a toString() on collection
-        // and it will call collection.iterator() where end-users might do stuff that would
break
-        // calling the iterator a 2nd time as below
-
-        // lets send the batch
-        Iterator<Exchange> iter = collection.iterator();
-        while (iter.hasNext()) {
-            Exchange exchange = iter.next();
-            iter.remove();
-            processExchange(exchange);
-        }
-    }
-
-    /**
      * A strategy method to decide if the batch is completed the resulting exchanges should
be sent
      */
-    protected boolean isBatchCompleted(int index) {
+    protected boolean isBatchCompleted(int num) {
         // out batch size is optional and we should only check it if its enabled (= >0)
         if (outBatchSize > 0 && collection.size() >= outBatchSize) {
             return true;
         }
         // fallback to regular batch size check
-        return index >= batchSize;
+        return num >= batchSize;
     }
 
     /**
@@ -191,16 +135,13 @@
     }
 
     protected void doStart() throws Exception {
-        consumer = endpoint.createPollingConsumer();
-
-        ServiceHelper.startServices(processor, consumer);
-
-        Thread thread = new Thread(this, this + " Polling Thread");
-        thread.start();
+        ServiceHelper.startServices(processor);
+        sender.start();
     }
 
     protected void doStop() throws Exception {
-        ServiceHelper.stopServices(consumer, processor);
+        sender.cancel();
+        ServiceHelper.stopServices(processor);
         collection.clear();
     }
 
@@ -208,7 +149,71 @@
         return collection;
     }
 
+    /**
+     * Enqueues an exchange for later batch processing.
+     */
     public void process(Exchange exchange) throws Exception {
-        // empty since exchanges come from endpoint's polling consumer
+        sender.enqueueExchange(exchange);
+    }
+
+    /**
+     * Sender thread for queued-up exchanges.
+     */
+    private class BatchSender extends Thread {
+        
+        private volatile boolean cancelRequested;
+
+        private LinkedBlockingQueue<Exchange> queue;
+        
+        public BatchSender() {
+            super("Batch Sender");
+            this.queue = new LinkedBlockingQueue<Exchange>();
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    Thread.sleep(batchTimeout);
+                } catch (InterruptedException e) {
+                    if (cancelRequested) {
+                        return;
+                    }
+                }
+                try {
+                    sendExchanges();
+                } catch (Exception e) {
+                    getExceptionHandler().handleException(e);
+                }
+            }
+        }
+        
+        public void cancel() {
+            cancelRequested = true;
+            interrupt();
+        }
+        
+        public void sendBatch() {
+            interrupt();
+        }
+     
+        public void enqueueExchange(Exchange exchange) {
+            queue.add(exchange);
+            if (isBatchCompleted(queue.size())) {
+                sendBatch();
+            }
+        }
+        
+        private void sendExchanges() throws Exception {
+            queue.drainTo(collection, batchSize);
+            Iterator<Exchange> iter = collection.iterator();
+            while (iter.hasNext()) {
+                Exchange exchange = iter.next();
+                iter.remove();
+                processExchange(exchange);
+            }
+        }
+        
     }
+    
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java?rev=724619&r1=724618&r2=724619&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
Mon Dec  8 22:29:43 2008
@@ -21,7 +21,6 @@
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
@@ -35,16 +34,16 @@
  * @version $Revision$
  */
 public class Resequencer extends BatchProcessor {
-    public Resequencer(Endpoint endpoint, Processor processor, Expression expression) {
-        this(endpoint, processor, createSet(expression));
+    public Resequencer(Processor processor, Expression expression) {
+        this(processor, createSet(expression));
     }
 
-    public Resequencer(Endpoint endpoint, Processor processor, List<Expression> expressions)
{
-        this(endpoint, processor, createSet(expressions));
+    public Resequencer(Processor processor, List<Expression> expressions) {
+        this(processor, createSet(expressions));
     }
 
-    public Resequencer(Endpoint endpoint, Processor processor, Set<Exchange> collection)
{
-        super(endpoint, processor, collection);
+    public Resequencer(Processor processor, Set<Exchange> collection) {
+        super(processor, collection);
     }
 
     @Override

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=724619&r1=724618&r2=724619&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
Mon Dec  8 22:29:43 2008
@@ -16,9 +16,7 @@
  */
 package org.apache.camel.processor;
 
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.impl.ServiceSupport;
@@ -52,14 +50,14 @@
  * 
  * @see ResequencerEngine
  */
-public class StreamResequencer extends ServiceSupport implements SequenceSender<Exchange>,
Runnable, Processor {
+public class StreamResequencer extends ServiceSupport implements SequenceSender<Exchange>,
Processor {
 
+    private static final long DELIVERY_ATTEMPT_INTERVAL = 1000L;
+    
     private ExceptionHandler exceptionHandler;
     private ResequencerEngine<Exchange> engine;
-    private PollingConsumer consumer;
-    private Endpoint endpoint;
     private Processor processor;
-    private Thread worker;
+    private Delivery delivery;
     private int capacity;
     
     /**
@@ -72,11 +70,10 @@
      * @param comparator
      *            a sequence element comparator for exchanges.
      */
-    public StreamResequencer(Endpoint endpoint, Processor processor, SequenceElementComparator<Exchange>
comparator) {
+    public StreamResequencer(Processor processor, SequenceElementComparator<Exchange>
comparator) {
         this.exceptionHandler = new LoggingExceptionHandler(getClass());
         this.engine = new ResequencerEngine<Exchange>(comparator);
         this.engine.setSequenceSender(this);
-        this.endpoint = endpoint;
         this.processor = processor;
     }
 
@@ -139,11 +136,10 @@
 
     @Override
     protected void doStart() throws Exception {
-        consumer = endpoint.createPollingConsumer();
-        ServiceHelper.startServices(processor, consumer);
-        worker = new Thread(this, this + " Polling Thread");
+        ServiceHelper.startServices(processor);
+        delivery = new Delivery();
         engine.start();
-        worker.start();
+        delivery.start();
     }
 
     @Override
@@ -151,7 +147,7 @@
         // let's stop everything in the reverse order
         // no need to stop the worker thread -- it will stop automatically when this service
is stopped
         engine.stop();
-        ServiceHelper.stopServices(consumer, processor);
+        ServiceHelper.stopServices(processor);
     }
 
     /**
@@ -164,43 +160,49 @@
         processor.process(o);
     }
 
-    /**
-     * Loops over {@link #processExchange()}.
-     */
-    public void run() {
-        while (!isStopped() && !isStopping()) {
-            try {
-                processExchange();
-            } catch (Exception e) {
-                exceptionHandler.handleException(e);
-            }
+    public void process(Exchange exchange) throws Exception {
+        while (engine.size() >= capacity) {
+            Thread.sleep(getTimeout());
         }
+        engine.insert(exchange);
+        delivery.request();
     }
 
-    /**
-     * Processes an exchange received from the this resequencer's
-     * <code>endpoint</code>. Received exchanges are processed via
-     * {@link ResequencerEngine#insert(Object)}.
-     * {@link ResequencerEngine#deliver()} is then called in any case regardless
-     * whether a message was received or receiving timed out.
-     * 
-     * @throws Exception
-     *             if exchange delivery fails.
-     */
-    protected void processExchange() throws Exception {
-        if (engine.size() >= capacity) {
-            Thread.sleep(getTimeout());
-        } else {
-            Exchange exchange = consumer.receive(getTimeout());
-            if (exchange != null) {
-                engine.insert(exchange);
+    private class Delivery extends Thread {
+
+        private volatile boolean cancelRequested;
+        
+        public Delivery() {
+            super("Delivery Thread");
+        }
+        
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    Thread.sleep(DELIVERY_ATTEMPT_INTERVAL);
+                } catch (InterruptedException e) {
+                    if (cancelRequested) {
+                        return;
+                    }
+                }
+                try {
+                    engine.deliver();
+                } catch (Exception e) {
+                    exceptionHandler.handleException(e);
+                }
             }
         }
-        engine.deliver();
-    }
 
-    public void process(Exchange exchange) throws Exception {
-        engine.insert(exchange);
+        public void cancel() {
+            cancelRequested = true;
+            interrupt();
+        }
+        
+        public void request() {
+            interrupt();
+        }
+        
     }
-
+    
 }
\ No newline at end of file

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java?rev=724619&r1=724618&r2=724619&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
(original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
Mon Dec  8 22:29:43 2008
@@ -17,8 +17,6 @@
 package org.apache.camel.processor;
 
 import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 
@@ -84,19 +82,13 @@
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                // disabled due CAMEL-393
-                //from("seda:header").setHeader("visited", constant(true)).aggregator(header("cheese")).to("mock:result");
 
                 // START SNIPPET: ex
                 // in this route we aggregate all from direct:state based on the header id
cheese
                 from("direct:start").aggregate(header("cheese")).to("mock:result");
 
-                // because of a bug in Camel (CAMEL-393) we can not have other types between
from and aggregator
-                // so we must do it as here with two routes. In the fist line we set the
header visited to true
-                // and link it to the 2nd route by sending it to direct:temp...
-                from("seda:header").setHeader("visited", constant(true)).to("direct:temp");
-                // and here we consume from direct:temp to continue from above and aggregate
-                from("direct:temp").aggregate(header("cheese")).to("mock:result");
+                // CAMEL-393 now fixed
+                from("seda:header").setHeader("visited", constant(true)).aggregate(header("cheese")).to("mock:result");
 
                 // in this sample we aggregate using our own startegy with a completion predicate
                 // stating that the aggregated header is equal to 5.

Added: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTest.java?rev=724619&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTest.java
(added)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTest.java
Mon Dec  8 22:29:43 2008
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.issues;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit38.AbstractJUnit38SpringContextTests;
+
+/**
+ * Unit test for issues CAMEL-1034 and CAMEL-1037
+ */
+@ContextConfiguration
+public class JmsResequencerTest extends AbstractJUnit38SpringContextTests {
+
+    @Autowired
+    protected CamelContext context;
+
+    @EndpointInject(uri = "mock:result")
+    protected MockEndpoint result;
+
+    protected ProducerTemplate template;
+    
+    public void setUp() {
+        template = context.createProducerTemplate();
+    }
+    
+    public void tearDown() {
+        result.reset();
+    }
+    
+    public void testBatchResequencer() throws Exception {
+        testResequencer("activemq:queue:in1");
+    }
+    
+    public void testStreamResequencer() throws Exception {
+        testResequencer("activemq:queue:in2");
+    }
+    
+    private void testResequencer(String endpoint) throws Exception {
+        result.expectedMessageCount(100);
+        for (int i = 0; i < 100; i++) {
+            result.message(i).body().isEqualTo(Integer.valueOf(i + 1));
+        }
+        for (int i = 100; i > 0; i--) {
+            template.sendBodyAndHeader(endpoint, Integer.valueOf(i), "num", Long.valueOf(i));
+        }
+        result.assertIsSatisfied();
+    }
+
+}

Propchange: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java?rev=724619&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java
(added)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java
Mon Dec  8 22:29:43 2008
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.issues;
+
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * RouteBuilder for {@link JmsResequencerTest}.
+ */
+public class JmsResequencerTestRouteBuilder extends RouteBuilder {
+
+    @Override
+    public void configure() throws Exception {
+        from("activemq:queue:in1").resequence().body().batch().size(100).timeout(10000L).to("mock:result");
+        from("activemq:queue:in2").resequence().header("num").stream().timeout(2000L).to("mock:result");
+    }
+
+}

Propchange: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/issues/JmsResequencerTest-context.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/issues/JmsResequencerTest-context.xml?rev=724619&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/issues/JmsResequencerTest-context.xml
(added)
+++ activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/issues/JmsResequencerTest-context.xml
Mon Dec  8 22:29:43 2008
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
+http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
+
+    <camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
+        <jmxAgent id="agent" disabled="true" />
+    </camelContext>
+
+    <bean id="routeBuilder"
+        class="org.apache.camel.component.jms.issues.JmsResequencerTestRouteBuilder">
+    </bean>
+
+    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
+        <property name="brokerURL" value="vm://localhost?broker.persistent=false&amp;broker.useJmx=false"/>
+    </bean>
+
+    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
+        <property name="connectionFactory" ref="jmsConnectionFactory"/>
+        <property name="transacted" value="false"/>
+        <property name="concurrentConsumers" value="10"/>
+    </bean>
+
+    <bean id="activemq" class="org.apache.camel.component.jms.JmsComponent">
+        <property name="configuration" ref="jmsConfig"/>
+    </bean>
+
+</beans>

Propchange: activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/issues/JmsResequencerTest-context.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/issues/JmsResequencerTest-context.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/issues/JmsResequencerTest-context.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml



Mime
View raw message