camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r553312 - in /activemq/camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/builder/ camel-core/src/main/java/org/apache/camel/component/mock/ camel-core/src/main/java/org/apache/camel/component/...
Date Wed, 04 Jul 2007 19:42:49 GMT
Author: jstrachan
Date: Wed Jul  4 12:42:47 2007
New Revision: 553312

URL: http://svn.apache.org/viewvc?view=rev&rev=553312
Log:
added a Throttler implementation and test case for CAMEL-65. Also did some refactoring of the processors to increase the reuse and tidied up some test cases. Also some initial coding for CAMEL-66. Finally tided up the code a little

Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/AlreadyStoppedException.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThrottlerBuilder.java
      - copied, changed from r552950, activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DelayerBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java   (contents, props changed)
      - copied, changed from r552950, activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayerProcessor.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
      - copied, changed from r552950, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerTest.java
Removed:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayerProcessor.java
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DelayerBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/InterceptorBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointConsumer.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CompositeProcessor.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateProcessor.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandler.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/NoMessageIdException.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/StickyLoadBalancer.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/MyInterceptorProcessor.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FilterTest.java
    activemq/camel/trunk/components/camel-jms/pom.xml
    activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java
    activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/AlreadyStoppedException.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/AlreadyStoppedException.java?view=auto&rev=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/AlreadyStoppedException.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/AlreadyStoppedException.java Wed Jul  4 12:42:47 2007
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * @version $Revision: $
+ */
+public class AlreadyStoppedException extends CamelException {
+
+    public AlreadyStoppedException() {
+        super("Already stopped");
+    }
+}

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DelayerBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DelayerBuilder.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DelayerBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DelayerBuilder.java Wed Jul  4 12:42:47 2007
@@ -20,7 +20,7 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
-import org.apache.camel.processor.DelayerProcessor;
+import org.apache.camel.processor.Delayer;
 
 /**
  * 
@@ -41,6 +41,6 @@
     @Override
     public Processor createProcessor() throws Exception {
         final Processor processor = super.createProcessor();
-        return new DelayerProcessor(processor, processAtExpression, delay);
+        return new Delayer(processor, processAtExpression, delay);
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java Wed Jul  4 12:42:47 2007
@@ -331,6 +331,24 @@
         return delayer(null, delay);
     }
 
+
+    /**
+     * A builder for the <a href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
+     * where an expression is used to calculate the time which the message will be dispatched on
+     *
+     * @param processAtExpression an expression to calculate the time at which the messages should be processed
+     * @param delay the delay in milliseconds which is added to the processAtExpression to determine the time the
+     * message should be processed
+     * @return the builder
+     */
+    @Fluent
+    public ThrottlerBuilder throttler(long maximumRequestCount) {
+        ThrottlerBuilder answer = new ThrottlerBuilder(this, maximumRequestCount);
+        setRouteBuilder(answer);
+        return answer;
+    }
+
+
     /**
      * Installs the given error handler builder
      *

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/InterceptorBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/InterceptorBuilder.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/InterceptorBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/InterceptorBuilder.java Wed Jul  4 12:42:47 2007
@@ -20,7 +20,6 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.camel.Exchange;
 import org.apache.camel.processor.DelegateProcessor;
 import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
@@ -63,14 +62,14 @@
             	first = p;
             }
             if( last != null ) {
-            	last.setNext(p);
+            	last.setProcessor(p);
             }
             last = p;
         }
         
         Processor p = target.createProcessor();
         if( last != null ) {
-        	last.setNext(p);
+        	last.setProcessor(p);
         }
         return first == null ? p : first;
     }

Copied: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThrottlerBuilder.java (from r552950, activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DelayerBuilder.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThrottlerBuilder.java?view=diff&rev=553312&p1=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DelayerBuilder.java&r1=552950&p2=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThrottlerBuilder.java&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DelayerBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ThrottlerBuilder.java Wed Jul  4 12:42:47 2007
@@ -17,30 +17,33 @@
  */
 package org.apache.camel.builder;
 
-import org.apache.camel.Exchange;
-import org.apache.camel.Expression;
 import org.apache.camel.Processor;
-import org.apache.camel.processor.DelayerProcessor;
+import org.apache.camel.processor.Throttler;
 
 /**
  * 
  * @version $Revision: 1.1 $
  */
-public class DelayerBuilder extends FromBuilder {
-    private final Expression<Exchange> processAtExpression;
-    private long delay;
-    private long batchTimeout = 0L;
-    private int batchSize = 1;
+public class ThrottlerBuilder extends FromBuilder {
+    private long maximumRequestsPerPeriod;
+    private long timePeriodMillis = 1000;
 
-    public DelayerBuilder(FromBuilder builder, Expression<Exchange> processAtExpression, long delay) {
-        super(builder);
-        this.delay = delay;
-        this.processAtExpression = processAtExpression;
+    public ThrottlerBuilder(FromBuilder parent, long maximumRequestsPerPeriod) {
+        super(parent);
+        this.maximumRequestsPerPeriod = maximumRequestsPerPeriod;
+    }
+
+    /**
+     * Sets the time period during which the maximum request count is valid for
+     */
+    public ThrottlerBuilder timePeriodMillis(long timePeriodMillis) {
+        this.timePeriodMillis = timePeriodMillis;
+        return this;
     }
 
     @Override
     public Processor createProcessor() throws Exception {
         final Processor processor = super.createProcessor();
-        return new DelayerProcessor(processor, processAtExpression, delay);
+        return new Throttler(processor, maximumRequestsPerPeriod, timePeriodMillis);
     }
-}
+}
\ No newline at end of file

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java Wed Jul  4 12:42:47 2007
@@ -20,12 +20,14 @@
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.ExpressionComparator;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -128,21 +130,26 @@
      * @param timeoutForEmptyEndpoints the timeout in milliseconds that we should wait for the test to be true
      */
     public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
-        if (latch != null) {
-            // now lets wait for the results
-            latch.await(defaulResultWaitMillis, TimeUnit.MILLISECONDS);
-        }
-        else if (expectedCount == 0) {
-            // lets wait a little bit just in case
-            if (timeoutForEmptyEndpoints > 0) {
-                log.debug("Sleeping for: " + timeoutForEmptyEndpoints + " millis to check there really are no messages received");
-                Thread.sleep(timeoutForEmptyEndpoints);
-            }
-        }
-
         if (expectedCount >= 0) {
-            int receivedCounter = getReceivedCounter();
-            assertEquals("Received message count", expectedCount, receivedCounter);
+            if (expectedCount != getReceivedCounter()) {
+                if (expectedCount == 0) {
+                    // lets wait a little bit just in case
+                    if (timeoutForEmptyEndpoints > 0) {
+                        log.debug("Sleeping for: " + timeoutForEmptyEndpoints + " millis to check there really are no messages received");
+                        Thread.sleep(timeoutForEmptyEndpoints);
+                    }
+                }
+                else {
+                    if (latch == null) {
+                        fail("Should have a latch!");
+                    }
+
+                    // now lets wait for the results
+                    log.debug("Waiting on the latch for: " + defaulResultWaitMillis + " millis");
+                    latch.await(defaulResultWaitMillis, TimeUnit.MILLISECONDS);
+                }
+            }
+            assertEquals("Received message count", expectedCount, getReceivedCounter());
         }
 
         if (expectedMinimumCount >= 0) {
@@ -227,6 +234,45 @@
     }
 
     /**
+     * Adds an expectation that messages received should have ascending values of the given expression
+     * such as a user generated counter value
+     *
+     * @param expression
+     */
+    public void expectsAscending(final Expression<Exchange> expression) {
+        expects(new Runnable() {
+            public void run() {
+                assertMessagesAscending(expression);
+            }
+        });
+    }
+
+    /**
+     * Asserts that the messages have ascending values of the given expression
+     */
+    public void assertMessagesAscending(Expression<Exchange> expression) {
+        ExpressionComparator comparator = new ExpressionComparator(expression);
+        List<Exchange> list = getReceivedExchanges();
+        for (int i = 1; i < expectedBodyValues.size(); i++) {
+            int j = i - 1;
+            Exchange e1 = list.get(j);
+            Exchange e2 = list.get(i);
+            int result = comparator.compare(e1, e2);
+            if (result == 0) {
+                Object value = expression.evaluate(e1);
+                fail("Messages not ascending. Messages" + j + " and " + i + " are equal with value: " + value
+                        + " for expression: " + expression + " when they were expected to be ascending. Exchanges: " + e1 + " and " + e2);
+            }
+            else if (result > 0) {
+                Object value = expression.evaluate(e1);
+                fail("Messages not ascending. Message " + j + " has value: " + expression.evaluate(e1)
+                        + " and message" + i + " has value: " + expression.evaluate(e2)
+                        + " for expression: " + expression + " when they were expected to be ascending. Exchanges: " + e1 + " and " + e2);
+            }
+        }
+    }
+
+    /**
      * Adds the expection which will be invoked when enough messages are received
      */
     public void expects(Runnable runnable) {
@@ -307,6 +353,18 @@
      */
     public void setSleepForEmptyTest(long sleepForEmptyTest) {
         this.sleepForEmptyTest = sleepForEmptyTest;
+    }
+
+    public long getDefaulResultWaitMillis() {
+        return defaulResultWaitMillis;
+    }
+
+    /**
+     * Sets the maximum amount of time the {@link #assertIsSatisfied()}
+     * will wait on a latch until it is satisfied
+     */
+    public void setDefaulResultWaitMillis(long defaulResultWaitMillis) {
+        this.defaulResultWaitMillis = defaulResultWaitMillis;
     }
 
     // Implementation methods

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java Wed Jul  4 12:42:47 2007
@@ -16,13 +16,10 @@
  */
 package org.apache.camel.component.queue;
 
-import org.apache.camel.CamelContext;
-import org.apache.camel.Component;
-import org.apache.camel.Exchange;
 import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultComponent;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -37,7 +34,7 @@
 public class QueueComponent<E extends Exchange> extends DefaultComponent<E> {
 	
 	public BlockingQueue<E> createQueue() {
-		return new LinkedBlockingQueue<E>();
+		return new LinkedBlockingQueue<E>(1000);
 	}
 
     @Override

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointConsumer.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointConsumer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointConsumer.java Wed Jul  4 12:42:47 2007
@@ -17,10 +17,13 @@
  */
 package org.apache.camel.component.queue;
 
+import org.apache.camel.AlreadyStoppedException;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.ServiceSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import java.util.concurrent.TimeUnit;
 
@@ -28,6 +31,9 @@
  * @version $Revision$
  */
 public class QueueEndpointConsumer<E extends Exchange> extends ServiceSupport implements Consumer<E>, Runnable {
+    private static final Log log = LogFactory.getLog(QueueEndpointConsumer.class);
+    private static int counter;
+
     private QueueEndpoint<E> endpoint;
     private Processor processor;
     private Thread thread;
@@ -55,20 +61,28 @@
                 try {
                     processor.process(exchange);
                 }
+                catch (AlreadyStoppedException e) {
+                    log.debug("Ignoring failed message due to shutdown: " + e, e);
+                    break;
+                }
                 catch (Throwable e) {
-                    e.printStackTrace();
+                    log.error(e);
                 }
             }
         }
     }
 
     protected void doStart() throws Exception {
-        thread = new Thread(this, endpoint.getEndpointUri());
+        thread = new Thread(this, endpoint.getEndpointUri() + " thread:" + nextCounter());
         thread.setDaemon(true);
         thread.start();
     }
 
     protected void doStop() throws Exception {
         thread.join();
+    }
+
+    protected static synchronized int nextCounter() {
+        return ++counter;
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java Wed Jul  4 12:42:47 2007
@@ -17,11 +17,11 @@
  */
 package org.apache.camel.processor;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
-import org.apache.camel.Exchange;
-import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.util.ServiceHelper;
 
 import java.util.ArrayList;
 import java.util.List;

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CompositeProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CompositeProcessor.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CompositeProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CompositeProcessor.java Wed Jul  4 12:42:47 2007
@@ -17,8 +17,8 @@
  */
 package org.apache.camel.processor;
 
-import org.apache.camel.Processor;
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.util.ServiceHelper;
 

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Wed Jul  4 12:42:47 2007
@@ -18,8 +18,8 @@
 package org.apache.camel.processor;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
 import org.apache.camel.Message;
+import org.apache.camel.Processor;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
@@ -141,7 +141,7 @@
             next = counter + 1;
         }
         in.setHeader(REDELIVERY_COUNTER, next);
-            in.setHeader(REDELIVERED, true);
+        in.setHeader(REDELIVERED, true);
         return next;
     }
 

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?view=auto&rev=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java Wed Jul  4 12:42:47 2007
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AlreadyStoppedException;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A useful base class for any processor which provides some kind of throttling
+ * or delayed processing
+ *
+ * @version $Revision: $
+ */
+public abstract class DelayProcessorSupport extends DelegateProcessor {
+    private static final transient Log log = LogFactory.getLog(Delayer.class);
+    private CountDownLatch stoppedLatch = new CountDownLatch(1);
+    private boolean fastStop = true;
+
+    public DelayProcessorSupport(Processor processor) {
+        super(processor);
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        delay(exchange);
+        super.process(exchange);
+    }
+
+    public boolean isFastStop() {
+        return fastStop;
+    }
+
+    /**
+     * Enables & disables a fast stop; basically to avoid waiting a possibly
+     * long time for delays to complete before the context shuts down; instead
+     * the current processing method throws {@link org.apache.camel.AlreadyStoppedException}
+     * to terminate processing.
+     */
+    public void setFastStop(boolean fastStop) {
+        this.fastStop = fastStop;
+    }
+
+    protected void doStop() throws Exception {
+        stoppedLatch.countDown();
+        super.doStop();
+    }
+
+    protected abstract void delay(Exchange exchange) throws Exception;
+
+    /**
+     * Wait until the given system time before continuing
+     *
+     * @param time     the system time to wait for
+     * @param exchange the exchange being processed
+     */
+    protected void waitUntil(long time, Exchange exchange) throws Exception {
+        while (true) {
+            long delay = time - currentSystemTime();
+            if (delay < 0) {
+                return;
+            }
+            else {
+                if (isFastStop() && (isStopped() || isStopping())) {
+                    throw new AlreadyStoppedException();
+                }
+                try {
+                    sleep(delay);
+                }
+                catch (InterruptedException e) {
+                    handleSleepInteruptedException(e);
+                }
+            }
+        }
+    }
+
+    protected void sleep(long delay) throws InterruptedException {
+        if (log.isDebugEnabled()) {
+            log.debug("Sleeping for: " + delay + " millis");
+        }
+        if (isFastStop()) {
+            stoppedLatch.await(delay, TimeUnit.MILLISECONDS);
+        }
+        else {
+            Thread.sleep(delay);
+        }
+    }
+
+    /**
+     * Called when a sleep is interupted; allows derived classes to handle this case differently
+     */
+    protected void handleSleepInteruptedException(InterruptedException e) {
+        log.debug("Sleep interupted: " + e, e);
+    }
+
+    protected long currentSystemTime() {
+        return System.currentTimeMillis();
+    }
+}

Copied: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java (from r552950, activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayerProcessor.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java?view=diff&rev=553312&p1=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayerProcessor.java&r1=552950&p2=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayerProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java Wed Jul  4 12:42:47 2007
@@ -20,50 +20,31 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.util.ExpressionHelper;
-import org.apache.camel.util.ServiceHelper;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 /**
  * A <a href="http://activemq.apache.org/camel/delayer.html">Delayer</a> which delays
  * processing the exchange until the correct amount of time has elapsed
  * using an expression to determine the delivery time.
- *
+ * <p/>
  * For example if you wish to delay JMS messages by 25 seconds from their publish time you could create
  * an instance of this class with the expression <code>header("JMSTimestamp")</code> and a delay value of 25000L.
  *
  * @version $Revision: 1.1 $
  */
-public class DelayerProcessor extends ServiceSupport implements Processor {
-    private static final transient Log log = LogFactory.getLog(DelayerProcessor.class);
+public class Delayer extends DelayProcessorSupport {
     private Expression<Exchange> timeExpression;
-    private Processor processor;
     private long delay = 0L;
 
-    public DelayerProcessor(Processor processor, Expression<Exchange> timeExpression, long delay) {
-        this.processor = processor;
+    public Delayer(Processor processor, Expression<Exchange> timeExpression, long delay) {
+        super(processor);
         this.timeExpression = timeExpression;
         this.delay = delay;
     }
 
-    public void process(Exchange exchange) throws Exception {
-        long time = 0;
-        if (timeExpression != null) {
-            Long longValue = ExpressionHelper.evaluateAsType(timeExpression, exchange, Long.class);
-            if (longValue != null) {
-                time = longValue.longValue();
-            }
-        }
-        if (time <= 0) {
-            time = defaultProcessTime(exchange);
-        }
-
-        time += delay;
-
-        waitUntil(time, exchange);
-        processor.process(exchange);
+    @Override
+    public String toString() {
+        return "Delayer[on: " + timeExpression + " delay: " + delay + " to: " + getProcessor() + "]";
     }
 
     // Properties
@@ -83,45 +64,24 @@
     // Implementation methods
     //-------------------------------------------------------------------------
 
-    protected void doStart() throws Exception {
-        ServiceHelper.startService(processor);
-    }
-
-    protected void doStop() throws Exception {
-        ServiceHelper.stopService(processor);
-    }
-
     /**
-     * Wait until the given system time before continuing
-     *
-     * @param time     the system time to wait for
-     * @param exchange the exchange being processed
+     * Waits for an optional time period before continuing to process the exchange
      */
-    protected void waitUntil(long time, Exchange exchange) {
-        while (true) {
-            long delay = time - currentSystemTime();
-            if (delay < 0) {
-                return;
-            }
-            else {
-                try {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Sleeping for: " + delay + " millis");
-                    }
-                    Thread.sleep(delay);
-                }
-                catch (InterruptedException e) {
-                    handleSleepInteruptedException(e);
-                }
+    protected void delay(Exchange exchange) throws Exception {
+        long time = 0;
+        if (timeExpression != null) {
+            Long longValue = ExpressionHelper.evaluateAsType(timeExpression, exchange, Long.class);
+            if (longValue != null) {
+                time = longValue.longValue();
             }
         }
-    }
+        if (time <= 0) {
+            time = defaultProcessTime(exchange);
+        }
 
-    /**
-     * Called when a sleep is interupted; allows derived classes to handle this case differently
-     */
-    protected void handleSleepInteruptedException(InterruptedException e) {
-        log.debug("Sleep interupted: " + e, e);
+        time += delay;
+
+        waitUntil(time, exchange);
     }
 
     /**
@@ -134,7 +94,4 @@
         return currentSystemTime();
     }
 
-    protected long currentSystemTime() {
-        return System.currentTimeMillis();
-    }
 }

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateProcessor.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateProcessor.java Wed Jul  4 12:42:47 2007
@@ -17,10 +17,10 @@
  */
 package org.apache.camel.processor;
 
-import org.apache.camel.Processor;
 import org.apache.camel.Exchange;
-import org.apache.camel.spi.Policy;
+import org.apache.camel.Processor;
 import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.spi.Policy;
 import org.apache.camel.util.ServiceHelper;
 
 /**
@@ -30,13 +30,13 @@
  * @version $Revision: 519941 $
  */
 public class DelegateProcessor extends ServiceSupport implements Processor {
-    protected Processor next;
+    private Processor processor;
 
     public DelegateProcessor() {
     }
 
-    public DelegateProcessor(Processor next) {
-        this.next = next;
+    public DelegateProcessor(Processor processor) {
+        this.processor = processor;
     }
 
     public void process(Exchange exchange) throws Exception {
@@ -44,29 +44,29 @@
     }
 
     protected void processNext(Exchange exchange) throws Exception {
-        if (next != null) {
-            next.process(exchange);
+        if (processor != null) {
+            processor.process(exchange);
         }
     }
 
     @Override
     public String toString() {
-        return "delegate(" + next + ")";
+        return "Delegate(" + processor + ")";
     }
 
-    public Processor getNext() {
-        return next;
+    public Processor getProcessor() {
+        return processor;
     }
 
-    public void setNext(Processor next) {
-        this.next = next;
+    public void setProcessor(Processor processor) {
+        this.processor = processor;
     }
 
     protected void doStart() throws Exception {
-        ServiceHelper.startServices(next);
+        ServiceHelper.startServices(processor);
     }
 
     protected void doStop() throws Exception {
-        ServiceHelper.stopServices(next);
+        ServiceHelper.stopServices(processor);
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandler.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandler.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandler.java Wed Jul  4 12:42:47 2007
@@ -17,7 +17,6 @@
  */
 package org.apache.camel.processor;
 
-import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 
 /**

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/FilterProcessor.java Wed Jul  4 12:42:47 2007
@@ -17,48 +17,33 @@
  */
 package org.apache.camel.processor;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
-import org.apache.camel.Exchange;
-import org.apache.camel.impl.ServiceSupport;
-import org.apache.camel.util.ServiceHelper;
 
 /**
  * @version $Revision$
  */
-public class FilterProcessor extends ServiceSupport implements Processor {
+public class FilterProcessor extends DelegateProcessor {
     private Predicate<Exchange> predicate;
-    private Processor processor;
 
     public FilterProcessor(Predicate<Exchange> predicate, Processor processor) {
+        super(processor);
         this.predicate = predicate;
-        this.processor = processor;
     }
 
     public void process(Exchange exchange) throws Exception {
         if (predicate.matches(exchange)) {
-            processor.process(exchange);
+            super.process(exchange);
         }
     }
 
     @Override
     public String toString() {
-        return "filter (" + predicate + ") " + processor;
+        return "Filter[if: " + predicate + " do: " + getProcessor() + "]";
     }
 
     public Predicate<Exchange> getPredicate() {
         return predicate;
-    }
-
-    public Processor getProcessor() {
-        return processor;
-    }
-
-    protected void doStart() throws Exception {
-        ServiceHelper.startServices(processor);
-    }
-
-    protected void doStop() throws Exception {
-        ServiceHelper.stopServices(processor);
     }
 }

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java?view=auto&rev=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java Wed Jul  4 12:42:47 2007
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+/**
+ * A <a href="http://activemq.apache.org/camel/throttler.html">Throttler</a>
+ * will set a limit on the maximum number of message exchanges which can be
+ * sent to a processor within a specific time period.
+ * <p/>
+ * This pattern can be extremely useful if you have some external system which
+ * meters access; such as only allowing 100 requests per second; or if huge load
+ * can cause a particular systme to malfunction or to reduce its throughput
+ * you might want to introduce some throttling.
+ *
+ * @version $Revision: $
+ */
+public class Throttler extends DelayProcessorSupport {
+    private long maximumRequestsPerPeriod;
+    private long timePeriodMillis;
+    private long startTimeMillis;
+    private long requestCount;
+
+
+    public Throttler(Processor processor, long maximumRequestsPerPeriod) {
+        this(processor, maximumRequestsPerPeriod, 1000);
+    }
+
+    public Throttler(Processor processor, long maximumRequestsPerPeriod, long timePeriodMillis) {
+        super(processor);
+        this.maximumRequestsPerPeriod = maximumRequestsPerPeriod;
+        this.timePeriodMillis = timePeriodMillis;
+    }
+
+    @Override
+    public String toString() {
+        return "Throttler[requests: " + maximumRequestsPerPeriod + " per: " + timePeriodMillis + " (ms) to: " + getProcessor() + "]";
+    }
+
+    // Properties
+    //-----------------------------------------------------------------------
+    public long getMaximumRequestsPerPeriod() {
+        return maximumRequestsPerPeriod;
+    }
+
+    /**
+     * Sets the maximum number of requests per time period
+     */
+    public void setMaximumRequestsPerPeriod(long maximumRequestsPerPeriod) {
+        this.maximumRequestsPerPeriod = maximumRequestsPerPeriod;
+    }
+
+    public long getTimePeriodMillis() {
+        return timePeriodMillis;
+    }
+
+    /**
+     * Sets the time period during which the maximum number of requests apply
+     */
+    public void setTimePeriodMillis(long timePeriodMillis) {
+        this.timePeriodMillis = timePeriodMillis;
+    }
+
+    /**
+     * The number of requests which have taken place so far within this time period
+     */
+    public long getRequestCount() {
+        return requestCount;
+    }
+
+    /**
+     * The start time when this current period began
+     */
+    public long getStartTimeMillis() {
+        return startTimeMillis;
+    }
+
+    // Implementation methods
+    //-----------------------------------------------------------------------
+    protected void delay(Exchange exchange) throws Exception {
+        long now = currentSystemTime();
+        if (startTimeMillis == 0) {
+            startTimeMillis = now;
+        }
+        if (now - startTimeMillis > timePeriodMillis) {
+            // we're at the start of a new time period
+            // so lets reset things
+            requestCount = 1;
+            startTimeMillis = now;
+        }
+        else {
+            if (++requestCount > maximumRequestsPerPeriod) {
+                // lets sleep until the start of the next time period
+                long time = startTimeMillis + timePeriodMillis;
+                waitUntil(time, exchange);
+            }
+        }
+    }
+}

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java Wed Jul  4 12:42:47 2007
@@ -28,7 +28,7 @@
  * @version $Revision: 1.1 $
  */
 public interface AggregationStrategy {
-    
+
     /**
      * Aggregates an old and new exchange together to create a single combined
      * exchange

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java Wed Jul  4 12:42:47 2007
@@ -26,7 +26,7 @@
  *
  * @version $Revision: 1.1 $
  */
-public class UseLatestAggregationStrategy implements AggregationStrategy{
+public class UseLatestAggregationStrategy implements AggregationStrategy {
 
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
         return newExchange;

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java Wed Jul  4 12:42:47 2007
@@ -76,7 +76,6 @@
         return nextProcessor;
     }
 
-
     // Implementation methods
     //-------------------------------------------------------------------------
 
@@ -91,7 +90,7 @@
     /**
      * A strategy method to allow derived classes to overload the behaviour of processing a duplicate message
      *
-     * @param exchange the exchange
+     * @param exchange  the exchange
      * @param messageId the message ID of this exchange
      */
     protected void onDuplicateMessage(Exchange exchange, String messageId) {

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java Wed Jul  4 12:42:47 2007
@@ -19,11 +19,8 @@
 
 import org.apache.camel.util.LRUCache;
 
-import java.util.Set;
-import java.util.HashSet;
-import java.util.Map;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
+import java.util.Map;
 
 /**
  * A memory based implementation of {@link MessageIdRepository}.

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/NoMessageIdException.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/NoMessageIdException.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/NoMessageIdException.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/NoMessageIdException.java Wed Jul  4 12:42:47 2007
@@ -17,8 +17,8 @@
  */
 package org.apache.camel.processor.idempotent;
 
-import org.apache.camel.Expression;
 import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
 import org.apache.camel.RuntimeCamelException;
 
 /**

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html Wed Jul  4 12:42:47 2007
@@ -19,7 +19,8 @@
 </head>
 <body>
 
-An implementation of the <a href="http://activemq.apache.org/camel/idempotent-consumer.html">Idempotent Consumer</a> pattern.
+An implementation of the <a href="http://activemq.apache.org/camel/idempotent-consumer.html">Idempotent Consumer</a>
+pattern.
 
 </body>
 </html>

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java Wed Jul  4 12:42:47 2007
@@ -18,8 +18,6 @@
 package org.apache.camel.processor.loadbalancer;
 
 import org.apache.camel.Processor;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
 
 import java.util.List;
 

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java Wed Jul  4 12:42:47 2007
@@ -17,7 +17,6 @@
  */
 package org.apache.camel.processor.loadbalancer;
 
-import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 
 import java.util.List;

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/StickyLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/StickyLoadBalancer.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/StickyLoadBalancer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/StickyLoadBalancer.java Wed Jul  4 12:42:47 2007
@@ -22,10 +22,9 @@
 import org.apache.camel.Processor;
 
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.Iterator;
 
 /**
  * Implements a sticky load balancer using an {@link Expression} to calculate
@@ -67,7 +66,7 @@
     @Override
     public void removeProcessor(Processor processor) {
         synchronized (stickyMap) {
-            Iterator<Map.Entry<Object,Processor>> iter = stickyMap.entrySet().iterator();
+            Iterator<Map.Entry<Object, Processor>> iter = stickyMap.entrySet().iterator();
             while (iter.hasNext()) {
                 Map.Entry<Object, Processor> entry = iter.next();
                 if (processor.equals(entry.getValue())) {

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java Wed Jul  4 12:42:47 2007
@@ -42,7 +42,7 @@
      * {@link Pipeline} will not clone the exchange
      *
      * @param processor the processor that will send the exchange
-     * @param exchange @return the current exchange if no copying is required such as for a pipeline otherwise a new copy of the exchange is returned.
+     * @param exchange  @return the current exchange if no copying is required such as for a pipeline otherwise a new copy of the exchange is returned.
      */
     protected Exchange copyExchangeStrategy(Processor processor, Exchange exchange) {
         return exchange.copy();

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java Wed Jul  4 12:42:47 2007
@@ -44,6 +44,7 @@
 
     @Override
     protected void tearDown() throws Exception {
+        log.debug("tearDown test: " + getName());
         template.stop();
         context.stop();
     }

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/MyInterceptorProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/MyInterceptorProcessor.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/MyInterceptorProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/MyInterceptorProcessor.java Wed Jul  4 12:42:47 2007
@@ -26,7 +26,7 @@
 
     public void process(Exchange exchange) throws Exception {
 		log.debug("START of onExchange: "+exchange);
-		next.process(exchange);
+		super.process(exchange);
 		log.debug("END of onExchange: "+exchange);
 	}
 }

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java Wed Jul  4 12:42:47 2007
@@ -259,11 +259,11 @@
             Processor processor = getProcessorWithoutErrorHandler(route);
 
             DelegateProcessor p1 = assertIsInstanceOf(DelegateProcessor.class, processor);
-            processor = p1.getNext();
+            processor = p1.getProcessor();
 
             DelegateProcessor p2 = assertIsInstanceOf(DelegateProcessor.class, processor);
 
-            assertSendTo(p2.getNext(), "queue:d");
+            assertSendTo(p2.getProcessor(), "queue:d");
         }
     }
 

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerTest.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerTest.java Wed Jul  4 12:42:47 2007
@@ -31,7 +31,7 @@
         resultEndpoint.expectedMessageCount(0);
 
         template.sendBody("queue:a", "<hello>world!</hello>", "JMSTimestamp", System.currentTimeMillis());
-        resultEndpoint.assertIsSatisfied(1000);
+        resultEndpoint.assertIsSatisfied();
 
         // now if we wait a bit longer we should receive the message!
         resultEndpoint.expectedMessageCount(1);

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FilterTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FilterTest.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FilterTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FilterTest.java Wed Jul  4 12:42:47 2007
@@ -18,10 +18,6 @@
 package org.apache.camel.processor;
 
 import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 
@@ -29,32 +25,25 @@
  * @version $Revision: 1.1 $
  */
 public class FilterTest extends ContextTestSupport {
-    protected Endpoint<Exchange> startEndpoint;
-    protected MockEndpoint resultEndpoint;
 
     public void testSendMatchingMessage() throws Exception {
+        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
         resultEndpoint.expectedMessageCount(1);
 
-        sendMessage("bar", "matched");
+        template.sendBody("direct:a", "matched", "foo", "bar");
 
         resultEndpoint.assertIsSatisfied();
     }
 
     public void testSendNotMatchingMessage() throws Exception {
+        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
         resultEndpoint.expectedMessageCount(0);
 
-        sendMessage("notMatchedHeaderValue", "notMatched");
+        template.sendBody("direct:a", "notMatched", "foo", "notMatchedHeaderValue");
 
         resultEndpoint.assertIsSatisfied();
     }
 
-    @Override
-    protected void setUp() throws Exception {
-        super.setUp();
-
-        startEndpoint = resolveMandatoryEndpoint("direct:a");
-        resultEndpoint = (MockEndpoint) resolveMandatoryEndpoint("mock:result");
-    }
 
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
@@ -64,14 +53,4 @@
         };
     }
 
-    protected void sendMessage(final Object headerValue, final Object body) throws Exception {
-        template.send(startEndpoint, new Processor() {
-            public void process(Exchange exchange) {
-                // now lets fire in a message
-                Message in = exchange.getIn();
-                in.setBody(body);
-                in.setHeader("foo", headerValue);
-            }
-        });
-    }
 }

Copied: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java (from r552950, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerTest.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java?view=diff&rev=553312&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerTest.java&r1=552950&p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java&r2=553312
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayerTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java Wed Jul  4 12:42:47 2007
@@ -24,31 +24,28 @@
 /**
  * @version $Revision: 1.1 $
  */
-public class DelayerTest extends ContextTestSupport {
+public class ThrottlerTest extends ContextTestSupport {
+    protected int messageCount = 6;
 
-    public void testSendingMessageGetsDelayed() throws Exception {
+    public void testSendLotsOfMessagesButOnly3GetThrough() throws Exception {
         MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(0);
+        resultEndpoint.expectedMessageCount(3);
+        resultEndpoint.setDefaulResultWaitMillis(1000);
 
-        template.sendBody("queue:a", "<hello>world!</hello>", "JMSTimestamp", System.currentTimeMillis());
-        resultEndpoint.assertIsSatisfied(1000);
+        for (int i = 0; i < messageCount; i++) {
+            template.sendBody("queue:a", "<message>" + i + "</message>");
+        }
 
-        // now if we wait a bit longer we should receive the message!
-        resultEndpoint.expectedMessageCount(1);
+        // lets pause to give the requests time to be processed
+        // to check that the throttle really does kick in
         resultEndpoint.assertIsSatisfied();
     }
 
-    @Override
-    protected void setUp() throws Exception {
-        super.setUp();
-
-    }
-
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
                 // START SNIPPET: ex
-                from("queue:a").delayer(header("JMSTimestamp"), 3000).to("mock:result");
+                from("queue:a").throttler(3).timePeriodMillis(30000).to("mock:result");
                 // END SNIPPET: ex
             }
         };

Modified: activemq/camel/trunk/components/camel-jms/pom.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/pom.xml?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/components/camel-jms/pom.xml (original)
+++ activemq/camel/trunk/components/camel-jms/pom.xml Wed Jul  4 12:42:47 2007
@@ -80,6 +80,16 @@
     </dependency>
   </dependencies>
 
+  <pluginRepositories>
+    <pluginRepository>
+      <id>maven2-repository.dev.java.net</id>
+      <name>Java.net Repository for Maven</name>
+      <url>http://download.java.net/maven/2/</url>
+      <layout>default</layout>
+    </pluginRepository>
+  </pluginRepositories>
+
+
   <build>
     <plugins>
 
@@ -95,6 +105,11 @@
         </configuration>
       </plugin>
 
+      <plugin>
+        <groupId>com.sun.tools.jxc.maven2</groupId>
+        <artifactId>maven-jaxb-schemagen-plugin</artifactId>
+        <version>1.1</version>
+      </plugin>
     </plugins>
   </build>
 </project>

Modified: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java Wed Jul  4 12:42:47 2007
@@ -74,7 +74,7 @@
 				        	
 				        	@Override
 				        	public String toString() {
-				                return "rollback(" + next + ")";
+				                return "rollback(" + this.processor + ")";
 				        	}
 				        };
 					}
@@ -92,7 +92,7 @@
 				        	}
 				        	@Override
 				        	public String toString() {
-				                return "catchRollback(" + next + ")";
+				                return "catchRollback(" + this.processor + ")";
 				        	}
 				        };
 					}

Modified: activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java?view=diff&rev=553312&r1=553311&r2=553312
==============================================================================
--- activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java (original)
+++ activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java Wed Jul  4 12:42:47 2007
@@ -68,7 +68,7 @@
 
             @Override
             public String toString() {
-                return "SpringTransactionPolicy:" + propagationBehaviorToString(transactionTemplate.getPropagationBehavior()) + "[" + getNext() + "]";
+                return "SpringTransactionPolicy:" + propagationBehaviorToString(transactionTemplate.getPropagationBehavior()) + "[" + getProcessor() + "]";
             }
 
             private String propagationBehaviorToString(int propagationBehavior) {



Mime
View raw message