camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject svn commit: r724629 - in /activemq/camel/branches/camel-1.x: ./ camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/test/java/org/apache/camel/processor/ components/camel-jms/src/test/jav...
Date Tue, 09 Dec 2008 07:18:06 GMT
Author: wtam
Date: Mon Dec  8 23:18:06 2008
New Revision: 724629

URL: http://svn.apache.org/viewvc?rev=724629&view=rev
Log:
Merged revisions 724619 via svnmerge from 
https://svn.apache.org/repos/asf/activemq/camel/trunk

........
  r724619 | wtam | 2008-12-09 01:29:43 -0500 (Tue, 09 Dec 2008) | 1 line
  
  Applied patch from Martin Krasser with thanks! [CAMEL-1037] Messages in Resequencer between
2 JMS queues get stuck
........

Added:
    activemq/camel/branches/camel-1.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTest.java
      - copied unchanged from r724619, activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTest.java
    activemq/camel/branches/camel-1.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java
      - copied, changed from r724619, activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java
    activemq/camel/branches/camel-1.x/components/camel-jms/src/test/resources/org/apache/camel/component/jms/issues/JmsResequencerTest-context.xml
      - copied unchanged from r724619, activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/issues/JmsResequencerTest-context.xml
Modified:
    activemq/camel/branches/camel-1.x/   (props changed)
    activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
    activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
    activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
    activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
    activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
    activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
    activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
    activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java

Propchange: activemq/camel/branches/camel-1.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Dec  8 23:18:06 2008
@@ -1 +1 @@
-/activemq/camel/trunk:709850,711200,711206,711219-711220,711523,711531,711756,711784,711859,711874,711962,711971,712064,712119,712148,712662,712692,712925,713013,713107,713136,713273,713290,713292,713295,713314,713475,713625,713932,713944,714032,717965,717989,718242,718273,718312-718515,719163-719184,719334,719339,719524,719662,719848,719851,719855,719864,719978-719979,720207,720435-720437,720806,721272,721331,721333-721334,721360,721669,721764,721813,721985,722005,722070,722110,722415,722438,722726,722845,722878,723264,723314,723325-723327,723409,723835,723966,724122
+/activemq/camel/trunk:709850,711200,711206,711219-711220,711523,711531,711756,711784,711859,711874,711962,711971,712064,712119,712148,712662,712692,712925,713013,713107,713136,713273,713290,713292,713295,713314,713475,713625,713932,713944,714032,717965,717989,718242,718273,718312-718515,719163-719184,719334,719339,719524,719662,719848,719851,719855,719864,719978-719979,720207,720435-720437,720806,721272,721331,721333-721334,721360,721669,721764,721813,721985,722005,722070,722110,722415,722438,722726,722845,722878,723264,723314,723325-723327,723409,723835,723966,724122,724619

Propchange: activemq/camel/branches/camel-1.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
URL: http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java?rev=724629&r1=724628&r2=724629&view=diff
==============================================================================
--- activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
(original)
+++ activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
Mon Dec  8 23:18:06 2008
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.model;
 
-import java.util.Collection;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
@@ -25,12 +24,9 @@
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
-import org.apache.camel.Route;
 import org.apache.camel.builder.ExpressionClause;
 import org.apache.camel.model.language.ExpressionType;
 import org.apache.camel.processor.Aggregator;
@@ -89,37 +85,12 @@
     public String getShortName() {
         return "aggregator";
     }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws
Exception {
-        final Aggregator aggregator = createAggregator(routeContext);
-        doAddRoute(routeContext, routes, aggregator);
-    }
-    
-    private void doAddRoute(RouteContext routeContext, Collection<Route> routes, final
Aggregator aggregator)
-        throws Exception {
-        Route route = new Route<Exchange>(aggregator.getEndpoint(), aggregator) {
-            @Override
-            public String toString() {
-                return "AggregatorRoute[" + getEndpoint() + " -> " + aggregator.getProcessor()
+ "]";
-            }
-        };
-
-        routes.add(route);
-    }
- 
     @Override
     public Processor createProcessor(RouteContext routeContext) throws Exception {
-        final Aggregator aggregator = createAggregator(routeContext);
-        
-        doAddRoute(routeContext, routeContext.getCamelContext().getRoutes(), aggregator);
-        routeContext.setIsRouteAdded(true);
-        return aggregator;
+        return createAggregator(routeContext);
     }
 
     protected Aggregator createAggregator(RouteContext routeContext) throws Exception {
-        Endpoint from = routeContext.getEndpoint();
         final Processor processor = routeContext.createProcessor(this);
 
         final Aggregator aggregator;
@@ -138,7 +109,7 @@
                 AggregationStrategy strategy = createAggregationStrategy(routeContext);
                 aggregationCollection.setAggregationStrategy(strategy);
             }
-            aggregator = new Aggregator(from, processor, aggregationCollection);
+            aggregator = new Aggregator(processor, aggregationCollection);
         } else {
             // create the aggregator using a default collection
             AggregationStrategy strategy = createAggregationStrategy(routeContext);
@@ -150,9 +121,9 @@
                 predicate = getCompletedPredicate().createPredicate(routeContext);
             }
             if (predicate != null) {
-                aggregator = new Aggregator(from, processor, aggregateExpression, strategy,
predicate);
+                aggregator = new Aggregator(processor, aggregateExpression, strategy, predicate);
             } else {
-                aggregator = new Aggregator(from, processor, aggregateExpression, strategy);
+                aggregator = new Aggregator(processor, aggregateExpression, strategy);
             }
         }
         

Modified: activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
URL: http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java?rev=724629&r1=724628&r2=724629&view=diff
==============================================================================
--- activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
(original)
+++ activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
Mon Dec  8 23:18:06 2008
@@ -26,6 +26,7 @@
 import java.util.Set;
 import java.util.concurrent.ThreadPoolExecutor;
 
+
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
@@ -62,7 +63,6 @@
 import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.Policy;
 import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.CollectionHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -755,9 +755,6 @@
      * together into a single invoice message.
      */
     public ExpressionClause<AggregatorType> aggregator() {
-        if (CollectionHelper.filterList(getOutputs(), ExceptionType.class).isEmpty()) {
-            LOG.warn("Aggregator must be the only output added to the route: " + this);
-        }
         AggregatorType answer = new AggregatorType();
         addOutput(answer);
         return ExpressionClause.createAndSetExpression(answer);
@@ -781,9 +778,6 @@
      * @param aggregationStrategy the strategy used for the aggregation
      */
     public ExpressionClause<AggregatorType> aggregator(AggregationStrategy aggregationStrategy)
{
-        if (CollectionHelper.filterList(getOutputs(), ExceptionType.class).isEmpty()) {
-            LOG.warn("Aggregator must be the only output added to the route: " + this);
-        }
         AggregatorType answer = new AggregatorType();
         answer.setAggregationStrategy(aggregationStrategy);
         addOutput(answer);
@@ -800,9 +794,6 @@
      * @param aggregationCollection the collection used to perform the aggregation
      */
     public AggregatorType aggregator(AggregationCollection aggregationCollection) {
-        if (CollectionHelper.filterList(getOutputs(), ExceptionType.class).isEmpty()) {
-            LOG.warn("Aggregator must be the only output added to the route: " + this);
-        }
         AggregatorType answer = new AggregatorType();
         answer.setAggregationCollection(aggregationCollection);
         addOutput(answer);
@@ -830,9 +821,6 @@
      *                              <code>header("JMSCorrelationID")</code>
      */
     public AggregatorType aggregator(Expression correlationExpression) {
-        if (CollectionHelper.filterList(getOutputs(), ExceptionType.class).isEmpty()) {
-            LOG.warn("Aggregator must be the only output added to the route: " + this);
-        }
         AggregatorType answer = new AggregatorType(correlationExpression);
         addOutput(answer);
         return answer;
@@ -859,9 +847,6 @@
      *                              <code>header("JMSCorrelationID")</code>
      */
     public AggregatorType aggregator(Expression correlationExpression, AggregationStrategy
aggregationStrategy) {
-        if (CollectionHelper.filterList(getOutputs(), ExceptionType.class).isEmpty()) {
-            LOG.warn("Aggregator must be the only output added to the route: " + this);
-        }
         AggregatorType answer = new AggregatorType(correlationExpression, aggregationStrategy);
         addOutput(answer);
         return answer;

Modified: activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
URL: http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java?rev=724629&r1=724628&r2=724629&view=diff
==============================================================================
--- activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
(original)
+++ activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/model/ResequencerType.java
Mon Dec  8 23:18:06 2008
@@ -28,7 +28,6 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
-import org.apache.camel.Route;
 import org.apache.camel.model.config.BatchResequencerConfig;
 import org.apache.camel.model.config.StreamResequencerConfig;
 import org.apache.camel.model.language.ExpressionType;
@@ -223,8 +222,7 @@
     protected Resequencer createBatchResequencer(RouteContext routeContext,
             BatchResequencerConfig config) throws Exception {
         Processor processor = routeContext.createProcessor(this);
-        Resequencer resequencer = new Resequencer(routeContext.getEndpoint(),
-                processor, resolveExpressionList(routeContext));
+        Resequencer resequencer = new Resequencer(processor, resolveExpressionList(routeContext));
         resequencer.setBatchSize(config.getBatchSize());
         resequencer.setBatchTimeout(config.getBatchTimeout());
         return resequencer;
@@ -245,34 +243,13 @@
             StreamResequencerConfig config) throws Exception {
         config.getComparator().setExpressions(resolveExpressionList(routeContext));
         Processor processor = routeContext.createProcessor(this);
-        StreamResequencer resequencer = new StreamResequencer(routeContext.getEndpoint(),
-                processor, config.getComparator());
+        StreamResequencer resequencer = new StreamResequencer(processor, config.getComparator());
         resequencer.setTimeout(config.getTimeout());
         resequencer.setCapacity(config.getCapacity());
         return resequencer;
         
     }
     
-    private Route<? extends Exchange> createBatchResequencerRoute(RouteContext routeContext)
throws Exception {
-        final Resequencer resequencer = createBatchResequencer(routeContext, batchConfig);
-        return new Route(routeContext.getEndpoint(), resequencer) {
-            @Override
-            public String toString() {
-                return "BatchResequencerRoute[" + getEndpoint() + " -> " + resequencer.getProcessor()
+ "]";
-            }
-        };
-    }
-    
-    private Route<? extends Exchange> createStreamResequencerRoute(RouteContext routeContext)
throws Exception {
-        final StreamResequencer resequencer = createStreamResequencer(routeContext, streamConfig);
-        return new Route(routeContext.getEndpoint(), resequencer) {
-            @Override
-            public String toString() {
-                return "StreamResequencerRoute[" + getEndpoint() + " -> " + resequencer.getProcessor()
+ "]";
-            }
-        };
-    }
-    
     private List<Expression> resolveExpressionList(RouteContext routeContext) {
         if (expressionList == null) {
             expressionList = new ArrayList<Expression>();

Modified: activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
URL: http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java?rev=724629&r1=724628&r2=724629&view=diff
==============================================================================
--- activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
(original)
+++ activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
Mon Dec  8 23:18:06 2008
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.processor;
 
-import org.apache.camel.Endpoint;
 import org.apache.camel.Expression;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
@@ -45,19 +44,19 @@
 public class Aggregator extends BatchProcessor {
     private Predicate aggregationCompletedPredicate;
 
-    public Aggregator(Endpoint endpoint, Processor processor, Expression correlationExpression,
+    public Aggregator(Processor processor, Expression correlationExpression,
                       AggregationStrategy aggregationStrategy) {
-        this(endpoint, processor, new DefaultAggregationCollection(correlationExpression,
aggregationStrategy));
+        this(processor, new DefaultAggregationCollection(correlationExpression, aggregationStrategy));
     }
 
-    public Aggregator(Endpoint endpoint, Processor processor, Expression correlationExpression,
+    public Aggregator(Processor processor, Expression correlationExpression,
                       AggregationStrategy aggregationStrategy, Predicate aggregationCompletedPredicate)
{
-        this(endpoint, processor, new PredicateAggregationCollection(correlationExpression,
aggregationStrategy, aggregationCompletedPredicate));
+        this(processor, new PredicateAggregationCollection(correlationExpression, aggregationStrategy,
aggregationCompletedPredicate));
         this.aggregationCompletedPredicate = aggregationCompletedPredicate;
     }
 
-    public Aggregator(Endpoint endpoint, Processor processor, AggregationCollection collection)
{
-        super(endpoint, processor, collection);
+    public Aggregator(Processor processor, AggregationCollection collection) {
+        super(processor, collection);
     }
 
     @Override
@@ -68,7 +67,7 @@
     @Override
     protected boolean isBatchCompleted(int index) {
         if (aggregationCompletedPredicate != null) {
-            // TODO: (davsclaus) What is the point with this code? I think its wrong
+            // TODO: (davsclaus) CAMEL-1159 What is the point with this code? I think its
wrong
             if (getCollection().size() > 0) {
                 return true;
             }

Modified: activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=724629&r1=724628&r2=724629&view=diff
==============================================================================
--- activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
(original)
+++ activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
Mon Dec  8 23:18:06 2008
@@ -18,17 +18,14 @@
 
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.util.ServiceHelper;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 /**
  * A base class for any kind of {@link Processor} which implements some kind of
@@ -36,24 +33,25 @@
  * 
  * @version $Revision$
  */
-public class BatchProcessor extends ServiceSupport implements Runnable, Processor {
+public class BatchProcessor extends ServiceSupport implements Processor {
+    
     public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
     public static final int DEFAULT_BATCH_SIZE = 100;
 
-    private static final transient Log LOG = LogFactory.getLog(BatchProcessor.class);
-    private Endpoint endpoint;
-    private Processor processor;
-    private Collection<Exchange> collection;
     private long batchTimeout = DEFAULT_BATCH_TIMEOUT;
     private int batchSize = DEFAULT_BATCH_SIZE;
     private int outBatchSize;
-    private PollingConsumer consumer;
+
+    private Processor processor;
+    private Collection<Exchange> collection;
     private ExceptionHandler exceptionHandler;
 
-    public BatchProcessor(Endpoint endpoint, Processor processor, Collection<Exchange>
collection) {
-        this.endpoint = endpoint;
+    private BatchSender sender;
+    
+    public BatchProcessor(Processor processor, Collection<Exchange> collection) {
         this.processor = processor;
         this.collection = collection;
+        this.sender = new BatchSender();
     }
 
     @Override
@@ -61,18 +59,6 @@
         return "BatchProcessor[to: " + processor + "]";
     }
 
-    public void run() {
-        LOG.debug("Starting thread for " + this);
-        while (isRunAllowed()) {
-            try {
-                processBatch();
-            } catch (Exception e) {
-                getExceptionHandler().handleException(e);
-            }
-        }
-        collection.clear();
-    }
-
     // Properties
     // -------------------------------------------------------------------------
     public ExceptionHandler getExceptionHandler() {
@@ -123,62 +109,20 @@
         this.batchTimeout = batchTimeout;
     }
 
-    public Endpoint getEndpoint() {
-        return endpoint;
-    }
-
     public Processor getProcessor() {
         return processor;
     }
 
     /**
-     * A transactional method to process a batch of messages up to a timeout
-     * period or number of messages reached.
-     */
-    protected synchronized void processBatch() throws Exception {
-        long start = System.currentTimeMillis();
-        long end = start + batchTimeout;
-        for (int i = 0; !isBatchCompleted(i); i++) {
-            long timeout = end - System.currentTimeMillis();
-            if (timeout < 0L) {                
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("batch timeout expired at batch index: " + i);
-                }
-                break;
-            }
-            Exchange exchange = consumer.receive(timeout);
-            if (exchange == null) {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("receive with timeout: " + timeout + " expired at batch index:
" + i);
-                }
-                break;
-            }
-            collection.add(exchange);
-        }
-
-        // we should NOT log the collection directly as it will invoke a toString() on collection
-        // and it will call collection.iterator() where end-users might do stuff that would
break
-        // calling the iterator a 2nd time as below
-
-        // lets send the batch
-        Iterator<Exchange> iter = collection.iterator();
-        while (iter.hasNext()) {
-            Exchange exchange = iter.next();
-            iter.remove();
-            processExchange(exchange);
-        }
-    }
-
-    /**
      * A strategy method to decide if the batch is completed the resulting exchanges should
be sent
      */
-    protected boolean isBatchCompleted(int index) {
+    protected boolean isBatchCompleted(int num) {
         // out batch size is optional and we should only check it if its enabled (= >0)
         if (outBatchSize > 0 && collection.size() >= outBatchSize) {
             return true;
         }
         // fallback to regular batch size check
-        return index >= batchSize;
+        return num >= batchSize;
     }
 
     /**
@@ -191,16 +135,13 @@
     }
 
     protected void doStart() throws Exception {
-        consumer = endpoint.createPollingConsumer();
-
-        ServiceHelper.startServices(processor, consumer);
-
-        Thread thread = new Thread(this, this + " Polling Thread");
-        thread.start();
+        ServiceHelper.startServices(processor);
+        sender.start();
     }
 
     protected void doStop() throws Exception {
-        ServiceHelper.stopServices(consumer, processor);
+        sender.cancel();
+        ServiceHelper.stopServices(processor);
         collection.clear();
     }
 
@@ -208,7 +149,71 @@
         return collection;
     }
 
+    /**
+     * Enqueues an exchange for later batch processing.
+     */
     public void process(Exchange exchange) throws Exception {
-        // empty since exchanges come from endpoint's polling consumer
+        sender.enqueueExchange(exchange);
+    }
+
+    /**
+     * Sender thread for queued-up exchanges.
+     */
+    private class BatchSender extends Thread {
+        
+        private volatile boolean cancelRequested;
+
+        private LinkedBlockingQueue<Exchange> queue;
+        
+        public BatchSender() {
+            super("Batch Sender");
+            this.queue = new LinkedBlockingQueue<Exchange>();
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    Thread.sleep(batchTimeout);
+                } catch (InterruptedException e) {
+                    if (cancelRequested) {
+                        return;
+                    }
+                }
+                try {
+                    sendExchanges();
+                } catch (Exception e) {
+                    getExceptionHandler().handleException(e);
+                }
+            }
+        }
+        
+        public void cancel() {
+            cancelRequested = true;
+            interrupt();
+        }
+        
+        public void sendBatch() {
+            interrupt();
+        }
+     
+        public void enqueueExchange(Exchange exchange) {
+            queue.add(exchange);
+            if (isBatchCompleted(queue.size())) {
+                sendBatch();
+            }
+        }
+        
+        private void sendExchanges() throws Exception {
+            queue.drainTo(collection, batchSize);
+            Iterator<Exchange> iter = collection.iterator();
+            while (iter.hasNext()) {
+                Exchange exchange = iter.next();
+                iter.remove();
+                processExchange(exchange);
+            }
+        }
+        
     }
+    
 }

Modified: activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
URL: http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java?rev=724629&r1=724628&r2=724629&view=diff
==============================================================================
--- activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
(original)
+++ activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
Mon Dec  8 23:18:06 2008
@@ -21,7 +21,6 @@
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
@@ -35,16 +34,16 @@
  * @version $Revision$
  */
 public class Resequencer extends BatchProcessor {
-    public Resequencer(Endpoint endpoint, Processor processor, Expression<Exchange>
expression) {
-        this(endpoint, processor, createSet(expression));
+    public Resequencer(Processor processor, Expression expression) {
+        this(processor, createSet(expression));
     }
 
-    public Resequencer(Endpoint endpoint, Processor processor, List<Expression> expressions)
{
-        this(endpoint, processor, createSet(expressions));
+    public Resequencer(Processor processor, List<Expression> expressions) {
+        this(processor, createSet(expressions));
     }
 
-    public Resequencer(Endpoint endpoint, Processor processor, Set<Exchange> collection)
{
-        super(endpoint, processor, collection);
+    public Resequencer(Processor processor, Set<Exchange> collection) {
+        super(processor, collection);
     }
 
     @Override

Modified: activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
URL: http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=724629&r1=724628&r2=724629&view=diff
==============================================================================
--- activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
(original)
+++ activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
Mon Dec  8 23:18:06 2008
@@ -16,9 +16,7 @@
  */
 package org.apache.camel.processor;
 
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.impl.ServiceSupport;
@@ -52,14 +50,14 @@
  * 
  * @see ResequencerEngine
  */
-public class StreamResequencer extends ServiceSupport implements SequenceSender<Exchange>,
Runnable, Processor {
+public class StreamResequencer extends ServiceSupport implements SequenceSender<Exchange>,
Processor {
 
+    private static final long DELIVERY_ATTEMPT_INTERVAL = 1000L;
+    
     private ExceptionHandler exceptionHandler;
     private ResequencerEngine<Exchange> engine;
-    private PollingConsumer<? extends Exchange> consumer;
-    private Endpoint<? extends Exchange> endpoint;
     private Processor processor;
-    private Thread worker;
+    private Delivery delivery;
     private int capacity;
     
     /**
@@ -72,11 +70,10 @@
      * @param comparator
      *            a sequence element comparator for exchanges.
      */
-    public StreamResequencer(Endpoint<? extends Exchange> endpoint, Processor processor,
SequenceElementComparator<Exchange> comparator) {
+    public StreamResequencer(Processor processor, SequenceElementComparator<Exchange>
comparator) {
         this.exceptionHandler = new LoggingExceptionHandler(getClass());
         this.engine = new ResequencerEngine<Exchange>(comparator);
         this.engine.setSequenceSender(this);
-        this.endpoint = endpoint;
         this.processor = processor;
     }
 
@@ -139,11 +136,10 @@
 
     @Override
     protected void doStart() throws Exception {
-        consumer = endpoint.createPollingConsumer();
-        ServiceHelper.startServices(processor, consumer);
-        worker = new Thread(this, this + " Polling Thread");
+        ServiceHelper.startServices(processor);
+        delivery = new Delivery();
         engine.start();
-        worker.start();
+        delivery.start();
     }
 
     @Override
@@ -151,7 +147,7 @@
         // let's stop everything in the reverse order
         // no need to stop the worker thread -- it will stop automatically when this service
is stopped
         engine.stop();
-        ServiceHelper.stopServices(consumer, processor);
+        ServiceHelper.stopServices(processor);
     }
 
     /**
@@ -164,43 +160,49 @@
         processor.process(o);
     }
 
-    /**
-     * Loops over {@link #processExchange()}.
-     */
-    public void run() {
-        while (!isStopped() && !isStopping()) {
-            try {
-                processExchange();
-            } catch (Exception e) {
-                exceptionHandler.handleException(e);
-            }
+    public void process(Exchange exchange) throws Exception {
+        while (engine.size() >= capacity) {
+            Thread.sleep(getTimeout());
         }
+        engine.insert(exchange);
+        delivery.request();
     }
 
-    /**
-     * Processes an exchange received from the this resequencer's
-     * <code>endpoint</code>. Received exchanges are processed via
-     * {@link ResequencerEngine#insert(Object)}.
-     * {@link ResequencerEngine#deliver()} is then called in any case regardless
-     * whether a message was received or receiving timed out.
-     * 
-     * @throws Exception
-     *             if exchange delivery fails.
-     */
-    protected void processExchange() throws Exception {
-        if (engine.size() >= capacity) {
-            Thread.sleep(getTimeout());
-        } else {
-            Exchange exchange = consumer.receive(getTimeout());
-            if (exchange != null) {
-                engine.insert(exchange);
+    private class Delivery extends Thread {
+
+        private volatile boolean cancelRequested;
+        
+        public Delivery() {
+            super("Delivery Thread");
+        }
+        
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    Thread.sleep(DELIVERY_ATTEMPT_INTERVAL);
+                } catch (InterruptedException e) {
+                    if (cancelRequested) {
+                        return;
+                    }
+                }
+                try {
+                    engine.deliver();
+                } catch (Exception e) {
+                    exceptionHandler.handleException(e);
+                }
             }
         }
-        engine.deliver();
-    }
 
-    public void process(Exchange exchange) throws Exception {
-        engine.insert(exchange);
+        public void cancel() {
+            cancelRequested = true;
+            interrupt();
+        }
+        
+        public void request() {
+            interrupt();
+        }
+        
     }
-
+    
 }
\ No newline at end of file

Modified: activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java?rev=724629&r1=724628&r2=724629&view=diff
==============================================================================
--- activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
(original)
+++ activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
Mon Dec  8 23:18:06 2008
@@ -17,8 +17,6 @@
 package org.apache.camel.processor;
 
 import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 
@@ -84,19 +82,13 @@
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                // disabled due CAMEL-393
-                //from("seda:header").setHeader("visited", constant(true)).aggregator(header("cheese")).to("mock:result");
 
                 // START SNIPPET: ex
                 // in this route we aggregate all from direct:state based on the header id
cheese
                 from("direct:start").aggregator(header("cheese")).to("mock:result");
 
-                // because of a bug in Camel (CAMEL-393) we can not have other types between
from and aggregator
-                // so we must do it as here with two routes. In the fist line we set the
header visited to true
-                // and link it to the 2nd route by sending it to direct:temp...
-                from("seda:header").setHeader("visited", constant(true)).to("direct:temp");
-                // and here we consume from direct:temp to continue from above and aggregate
-                from("direct:temp").aggregator(header("cheese")).to("mock:result");
+                // CAMEL-393 now fixed
+                from("seda:header").setHeader("visited", constant(true)).aggregator(header("cheese")).to("mock:result");
 
                 // in this sample we aggreagte using our own startegy with a completion predicate
                 // stating that the aggregated header is equal to 5.

Copied: activemq/camel/branches/camel-1.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java
(from r724619, activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java)
URL: http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java?p2=activemq/camel/branches/camel-1.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java&p1=activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java&r1=724619&r2=724629&rev=724629&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java
(original)
+++ activemq/camel/branches/camel-1.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsResequencerTestRouteBuilder.java
Mon Dec  8 23:18:06 2008
@@ -25,8 +25,8 @@
 
     @Override
     public void configure() throws Exception {
-        from("activemq:queue:in1").resequence().body().batch().size(100).timeout(10000L).to("mock:result");
-        from("activemq:queue:in2").resequence().header("num").stream().timeout(2000L).to("mock:result");
+        from("activemq:queue:in1").resequencer().body().batch().size(100).timeout(10000L).to("mock:result");
+        from("activemq:queue:in2").resequencer().header("num").stream().timeout(2000L).to("mock:result");
     }
 
 }



Mime
View raw message