activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r530102 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/builder/ main/java/org/apache/camel/processor/idempotent/ main/java/org/apache/camel/util/ test/java/org/apache/camel/builder/
Date Wed, 18 Apr 2007 17:45:06 GMT
Author: jstrachan
Date: Wed Apr 18 10:45:05 2007
New Revision: 530102

URL: http://svn.apache.org/viewvc?view=rev&rev=530102
Log:
minor refactor to make the builder support ExpressionFactory; also added support for IdempotentConsumer
along with a test case

Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/IdempotentConsumerBuilder.java
  (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
  (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java
  (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MessageIdRepository.java
  (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/NoMessageIdException.java
  (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html
  (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExpressionHelper.java
  (with props)
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java
  (with props)
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RecipientListBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ValueBuilder.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java?view=diff&rev=530102&r1=530101&r2=530102
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
Wed Apr 18 10:45:05 2007
@@ -16,12 +16,9 @@
  */
 package org.apache.camel.builder;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.processor.CompositeProcessor;
@@ -29,6 +26,12 @@
 import org.apache.camel.processor.MulticastProcessor;
 import org.apache.camel.processor.Pipeline;
 import org.apache.camel.processor.RecipientList;
+import org.apache.camel.processor.idempotent.IdempotentConsumer;
+import org.apache.camel.processor.idempotent.MessageIdRepository;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 
 /**
  * @version $Revision$
@@ -55,7 +58,7 @@
      * Sends the exchange to the given endpoint URI
      */
     @Fluent
-    public ProcessorFactory<E> to(@FluentArg("uri") String uri) {
+    public ProcessorFactory<E> to(@FluentArg("uri")String uri) {
         return to(endpoint(uri));
     }
 
@@ -63,7 +66,7 @@
      * Sends the exchange to the given endpoint
      */
     @Fluent
-    public ProcessorFactory<E> to(@FluentArg("endpoint") Endpoint<E> endpoint)
{
+    public ProcessorFactory<E> to(@FluentArg("endpoint")Endpoint<E> endpoint)
{
         ToBuilder<E> answer = new ToBuilder<E>(this, endpoint);
         addProcessBuilder(answer);
         return answer;
@@ -74,8 +77,8 @@
      */
     @Fluent
     public ProcessorFactory<E> to(
-    		@FluentArg(value="uri", attribute=false, element=true) 
-    		String... uris) {
+            @FluentArg(value = "uri", attribute = false, element = true)
+            String... uris) {
         return to(endpoints(uris));
     }
 
@@ -84,8 +87,8 @@
      */
     @Fluent
     public ProcessorFactory<E> to(
-    		@FluentArg(value="endpoint", attribute=false, element=true) 
-    		Endpoint<E>... endpoints) {
+            @FluentArg(value = "endpoint", attribute = false, element = true)
+            Endpoint<E>... endpoints) {
         return to(endpoints(endpoints));
     }
 
@@ -93,7 +96,7 @@
      * Sends the exchange to a list of endpoint using the {@link MulticastProcessor} pattern
      */
     @Fluent
-    public ProcessorFactory<E> to(@FluentArg("endpoints") Collection<Endpoint<E>>
endpoints) {
+    public ProcessorFactory<E> to(@FluentArg("endpoints")Collection<Endpoint<E>>
endpoints) {
         return addProcessBuilder(new MulticastBuilder<E>(this, endpoints));
     }
 
@@ -102,7 +105,7 @@
      * and for request/response the output of one endpoint will be the input of the next
endpoint
      */
     @Fluent
-    public ProcessorFactory<E> pipeline(@FluentArg("uris") String... uris) {
+    public ProcessorFactory<E> pipeline(@FluentArg("uris")String... uris) {
         return pipeline(endpoints(uris));
     }
 
@@ -111,7 +114,7 @@
      * and for request/response the output of one endpoint will be the input of the next
endpoint
      */
     @Fluent
-    public ProcessorFactory<E> pipeline(@FluentArg("endpoints") Endpoint<E>...
endpoints) {
+    public ProcessorFactory<E> pipeline(@FluentArg("endpoints")Endpoint<E>...
endpoints) {
         return pipeline(endpoints(endpoints));
     }
 
@@ -120,15 +123,32 @@
      * and for request/response the output of one endpoint will be the input of the next
endpoint
      */
     @Fluent
-    public ProcessorFactory<E> pipeline(@FluentArg("endpoints") Collection<Endpoint<E>>
endpoints) {
+    public ProcessorFactory<E> pipeline(@FluentArg("endpoints")Collection<Endpoint<E>>
endpoints) {
         return addProcessBuilder(new PipelineBuilder<E>(this, endpoints));
     }
 
     /**
+     * Creates an {@link IdempotentConsumer} to avoid duplicate messages
+     */
+    @Fluent
+    public IdempotentConsumerBuilder<E> idempotentConsumer(
+            @FluentArg("messageIdExpression")Expression<E> messageIdExpression,
+            @FluentArg("MessageIdRepository")MessageIdRepository messageIdRepository) {
+        return (IdempotentConsumerBuilder<E>) addProcessBuilder(new IdempotentConsumerBuilder<E>(this,
messageIdExpression, messageIdRepository));
+    }
+
+    /**
+     * Creates an {@link IdempotentConsumer} to avoid duplicate messages
+     */
+    public IdempotentConsumerBuilder<E> idempotentConsumer(ExpressionFactory<E>
messageIdExpressionFactory, MessageIdRepository messageIdRepository) {
+        return idempotentConsumer(messageIdExpressionFactory.createExpression(), messageIdRepository);
+    }
+
+    /**
      * Adds the custom processor to this destination
      */
     @Fluent
-    public ConstantProcessorBuilder<E> process(@FluentArg("ref") Processor<E>
processor) {
+    public ConstantProcessorBuilder<E> process(@FluentArg("ref")Processor<E>
processor) {
         ConstantProcessorBuilder<E> answer = new ConstantProcessorBuilder<E>(processor);
         addProcessBuilder(answer);
         return answer;
@@ -142,8 +162,8 @@
      */
     @Fluent
     public FilterBuilder<E> filter(
-    		@FluentArg(value="predicate",element=true) 
-    		Predicate<E> predicate) {
+            @FluentArg(value = "predicate", element = true)
+            Predicate<E> predicate) {
         FilterBuilder<E> answer = new FilterBuilder<E>(this, predicate);
         addProcessBuilder(answer);
         return answer;
@@ -154,7 +174,7 @@
      *
      * @return the builder for a choice expression
      */
-    @Fluent(nestedActions=true)
+    @Fluent(nestedActions = true)
     public ChoiceBuilder<E> choice() {
         ChoiceBuilder<E> answer = new ChoiceBuilder<E>(this);
         addProcessBuilder(answer);
@@ -168,8 +188,8 @@
      */
     @Fluent
     public RecipientListBuilder<E> recipientList(
-    		@FluentArg(value="recipients",element=true) 
-    		ValueBuilder<E> receipients) {
+            @FluentArg(value = "recipients", element = true)
+            ValueBuilder<E> receipients) {
         RecipientListBuilder<E> answer = new RecipientListBuilder<E>(this, receipients);
         addProcessBuilder(answer);
         return answer;
@@ -183,7 +203,7 @@
      * @return the builder
      */
     @Fluent
-    public SplitterBuilder<E> splitter(@FluentArg(value="recipients", element=true)
ValueBuilder<E> receipients) {
+    public SplitterBuilder<E> splitter(@FluentArg(value = "recipients", element = true)ValueBuilder<E>
receipients) {
         SplitterBuilder<E> answer = new SplitterBuilder<E>(this, receipients);
         addProcessBuilder(answer);
         return answer;
@@ -196,7 +216,7 @@
      * @return the current builder with the error handler configured
      */
     @Fluent
-    public FromBuilder<E> errorHandler(@FluentArg("handler") ErrorHandlerBuilder errorHandlerBuilder)
{
+    public FromBuilder<E> errorHandler(@FluentArg("handler")ErrorHandlerBuilder errorHandlerBuilder)
{
         setErrorHandlerBuilder(errorHandlerBuilder);
         return this;
     }
@@ -208,12 +228,12 @@
      * @return the current builder
      */
     @Fluent
-    public FromBuilder<E> inheritErrorHandler(@FluentArg("condition") boolean condition)
{
+    public FromBuilder<E> inheritErrorHandler(@FluentArg("condition")boolean condition)
{
         setInheritErrorHandler(condition);
         return this;
     }
-    
-    @Fluent(nestedActions=true)
+
+    @Fluent(nestedActions = true)
     public InterceptorBuilder<E> intercept() {
         InterceptorBuilder<E> answer = new InterceptorBuilder<E>(this);
         addProcessBuilder(answer);
@@ -221,14 +241,13 @@
     }
 
     @Fluent
-    public InterceptorBuilder<E> intercept(@FluentArg("interceptor") InterceptorProcessor<E>
interceptor) {
+    public InterceptorBuilder<E> intercept(@FluentArg("interceptor")InterceptorProcessor<E>
interceptor) {
         InterceptorBuilder<E> answer = new InterceptorBuilder<E>(this);
         answer.add(interceptor);
         addProcessBuilder(answer);
         return answer;
     }
 
-
     // Properties
     //-------------------------------------------------------------------------
     public RouteBuilder<E> getBuilder() {
@@ -274,11 +293,31 @@
      */
     protected Processor<E> makeProcessor(ProcessorFactory<E> processFactory)
throws Exception {
         Processor<E> processor = processFactory.createProcessor();
+        processor = wrapProcessor(processor);
+        return wrapInErrorHandler(processor);
+    }
+
+    /**
+     * A strategy method to allow newly created processors to be wrapped in an error handler.
This feature
+     * could be disabled for child builders such as {@link IdempotentConsumerBuilder} which
will rely on the
+     * {@link FromBuilder} to perform the error handling to avoid doubly-wrapped processors
with 2 nested error handlers
+     */
+    protected Processor<E> wrapInErrorHandler(Processor<E> processor) throws
Exception {
         return getErrorHandlerBuilder().createErrorHandler(processor);
     }
 
+    /**
+     * A strategy method which allows derived classes to wrap the child processor in some
kind of interceptor such as
+     * a filter for the {@link IdempotentConsumerBuilder}.
+     *
+     * @param processor the processor which can be wrapped
+     * @return the original processor or a new wrapped interceptor
+     */
+    protected Processor<E> wrapProcessor(Processor<E> processor) {
+        return processor;
+    }
+
     public List<Processor<E>> getProcessors() {
         return processors;
     }
-
 }

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/IdempotentConsumerBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/IdempotentConsumerBuilder.java?view=auto&rev=530102
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/IdempotentConsumerBuilder.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/IdempotentConsumerBuilder.java
Wed Apr 18 10:45:05 2007
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Expression;
+import org.apache.camel.processor.idempotent.IdempotentConsumer;
+import org.apache.camel.processor.idempotent.MessageIdRepository;
+
+/**
+ * A builder of an {@link IdempotentConsumer}
+ *
+ * @version $Revision: 1.1 $
+ */
+public class IdempotentConsumerBuilder<E extends Exchange> extends FromBuilder<E>
implements ProcessorFactory<E> {
+    private final Expression<E> messageIdExpression;
+    private final MessageIdRepository messageIdRegistry;
+
+    public IdempotentConsumerBuilder(FromBuilder<E> fromBuilder, Expression<E>
messageIdExpression, MessageIdRepository messageIdRegistry) {
+        super(fromBuilder);
+        this.messageIdRegistry = messageIdRegistry;
+        this.messageIdExpression = messageIdExpression;
+    }
+
+    // Properties
+    //-------------------------------------------------------------------------
+    public MessageIdRepository getMessageIdRegistry() {
+        return messageIdRegistry;
+    }
+
+    // Implementation methods
+    //-------------------------------------------------------------------------
+    @Override
+    protected Processor<E> wrapInErrorHandler(Processor<E> processor) throws
Exception {
+        // lets do no wrapping in error handlers as the parent FromBuilder will do that
+        return processor;
+    }
+
+    @Override
+    protected Processor<E> wrapProcessor(Processor<E> processor) {
+        return new IdempotentConsumer<E>(messageIdExpression, messageIdRegistry, processor);
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/IdempotentConsumerBuilder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RecipientListBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RecipientListBuilder.java?view=diff&rev=530102&r1=530101&r2=530102
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RecipientListBuilder.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RecipientListBuilder.java
Wed Apr 18 10:45:05 2007
@@ -28,15 +28,15 @@
  * @version $Revision$
  */
 public class RecipientListBuilder<E extends Exchange> extends BuilderSupport<E>
implements ProcessorFactory<E> {
-    private final ValueBuilder<E> valueBuilder;
+    private final ExpressionFactory<E> expressionFactory;
 
-    public RecipientListBuilder(FromBuilder<E> parent, ValueBuilder<E> valueBuilder)
{
+    public RecipientListBuilder(FromBuilder<E> parent, ExpressionFactory<E> expressionFactory)
{
         super(parent);
-        this.valueBuilder = valueBuilder;
+        this.expressionFactory = expressionFactory;
     }
 
     public Processor<E> createProcessor() {
-        Expression<E> expression = valueBuilder.getExpression();
+        Expression<E> expression = expressionFactory.createExpression();
         return new RecipientList<E>(expression);
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java?view=diff&rev=530102&r1=530101&r2=530102
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/SplitterBuilder.java
Wed Apr 18 10:45:05 2007
@@ -29,17 +29,17 @@
  * @version $Revision$
  */
 public class SplitterBuilder<E extends Exchange> extends FromBuilder<E> {
-    private final ValueBuilder<E> valueBuilder;
+    private final ExpressionFactory<E> expressionFactory;
 
-    public SplitterBuilder(FromBuilder<E> parent, ValueBuilder<E> valueBuilder)
{
+    public SplitterBuilder(FromBuilder<E> parent, ExpressionFactory<E> expressionFactory)
{
         super(parent);
-        this.valueBuilder = valueBuilder;
+        this.expressionFactory = expressionFactory;
     }
 
     public Processor<E> createProcessor() throws Exception {
         // lets create a single processor for all child predicates
         Processor<E> destination = super.createProcessor();
-        Expression<E> expression = valueBuilder.getExpression();
+        Expression<E> expression = expressionFactory.createExpression();
         return new Splitter<E>(destination, expression);
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ValueBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ValueBuilder.java?view=diff&rev=530102&r1=530101&r2=530102
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ValueBuilder.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ValueBuilder.java
Wed Apr 18 10:45:05 2007
@@ -25,7 +25,7 @@
  *
  * @version $Revision: $
  */
-public class ValueBuilder<E extends Exchange> {
+public class ValueBuilder<E extends Exchange> implements ExpressionFactory<E>
{
     private Expression<E> expression;
 
     public ValueBuilder(Expression<E> expression) {
@@ -36,6 +36,11 @@
         return expression;
     }
 
+
+    public Expression<E> createExpression() {
+        return expression;
+    }
+
     @Fluent
     public Predicate<E> isNotEqualTo(@FluentArg("value") Object value) {
         Expression<E> right = ExpressionBuilder.constantExpression(value);
@@ -124,4 +129,5 @@
     public ValueBuilder<E> convertToString() {
         return convertTo(String.class);
     }
+
 }

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java?view=auto&rev=530102
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
Wed Apr 18 10:45:05 2007
@@ -0,0 +1,88 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+import org.apache.camel.Processor;
+import org.apache.camel.util.ExpressionHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * An implementation of the
+ * <a href="http://activemq.apache.org/camel/idempotent-consumer.html">Idempotent Consumer</a>
pattern.
+ *
+ * @version $Revision: 1.1 $
+ */
+public class IdempotentConsumer<E extends Exchange> implements Processor<E> {
+    private static final transient Log log = LogFactory.getLog(IdempotentConsumer.class);
+    private Expression<E> messageIdExpression;
+    private Processor<E> nextProcessor;
+    private MessageIdRepository messageIdRepository;
+
+    public IdempotentConsumer(Expression<E> messageIdExpression, MessageIdRepository
messageIdRepository, Processor<E> nextProcessor) {
+        this.messageIdExpression = messageIdExpression;
+        this.messageIdRepository = messageIdRepository;
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public String toString() {
+        return "IdempotentConsumer[expression=" + messageIdExpression + ", repository=" +
messageIdRepository + ", processor=" + nextProcessor + "]";
+    }
+
+    public void process(E exchange) {
+        String messageId = ExpressionHelper.evaluateAsString(messageIdExpression, exchange);
+        if (messageId == null) {
+            throw new NoMessageIdException(exchange, messageIdExpression);
+        }
+        if (!messageIdRepository.contains(messageId)) {
+            nextProcessor.process(exchange);
+        }
+        else {
+            onDuplicateMessage(exchange, messageId);
+        }
+    }
+
+    // Properties
+    //-------------------------------------------------------------------------
+    public Expression<E> getMessageIdExpression() {
+        return messageIdExpression;
+    }
+
+    public MessageIdRepository getMessageIdRepository() {
+        return messageIdRepository;
+    }
+
+    public Processor<E> getNextProcessor() {
+        return nextProcessor;
+    }
+
+    /**
+     * A strategy method to allow derived classes to overload the behaviour of processing
a duplicate message
+     *
+     * @param exchange the exchange
+     * @param messageId the message ID of this exchange
+     */
+    protected void onDuplicateMessage(E exchange, String messageId) {
+        if (log.isDebugEnabled()) {
+            log.debug("Ignoring duplicate message with id: " + messageId + " for exchange:
" + exchange);
+        }
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java?view=auto&rev=530102
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java
Wed Apr 18 10:45:05 2007
@@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent;
+
+import java.util.Set;
+import java.util.HashSet;
+
+/**
+ * A simple memory implementation of {@link MessageIdRepository}; though warning this could
use up lots of RAM!
+ *
+ * @version $Revision: 1.1 $
+ */
+public class MemoryMessageIdRepository implements MessageIdRepository {
+    private Set set;
+
+    /**
+     * Creates a new MemoryMessageIdRepository with a memory based respository
+     */
+    public static MessageIdRepository memoryMessageIdRepository() {
+        return memoryMessageIdRepository(new HashSet());
+    }
+
+    /**
+     * Creates a new MemoryMessageIdRepository using the given {@link Set} to use to store
the
+     * processed Message ID objects
+     */
+    public static MessageIdRepository memoryMessageIdRepository(Set set) {
+        return new MemoryMessageIdRepository(set);
+    }
+
+    public MemoryMessageIdRepository(Set set) {
+        this.set = set;
+    }
+
+    public boolean contains(String messageId) {
+        synchronized (set) {
+            if (set.contains(messageId)) {
+                return true;
+            }
+            else {
+                set.add(messageId);
+                return false;
+            }
+        }
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MemoryMessageIdRepository.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MessageIdRepository.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MessageIdRepository.java?view=auto&rev=530102
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MessageIdRepository.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MessageIdRepository.java
Wed Apr 18 10:45:05 2007
@@ -0,0 +1,36 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent;
+
+/**
+ * Access to a repository of Message IDs to implement the
+ * <a href="http://activemq.apache.org/camel/idempotent-consumer.html">Idempotent Consumer</a>
pattern.
+ *
+ * @version $Revision: 1.1 $
+ */
+public interface MessageIdRepository {
+
+    /**
+     * Returns true if this messageId has been processed before
+     * otherwise this messageId is added to the repository and false is returned.
+     *
+     * @param messageId the String ID of the message
+     * @return true if the message has been processed succesfully before otherwise false
+     */
+    boolean contains(String messageId);
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/MessageIdRepository.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/NoMessageIdException.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/NoMessageIdException.java?view=auto&rev=530102
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/NoMessageIdException.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/NoMessageIdException.java
Wed Apr 18 10:45:05 2007
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent;
+
+import org.apache.camel.Expression;
+import org.apache.camel.Exchange;
+import org.apache.camel.RuntimeCamelException;
+
+/**
+ * An exception thrown if no message ID could be found on a message which is to be used with
the
+ * <a href="http://activemq.apache.org/camel/idempotent-consumer.html">Idempotent Consumer</a>
pattern.
+ *
+ * @version $Revision: 1.1 $
+ */
+public class NoMessageIdException extends RuntimeCamelException {
+    private final Exchange exchange;
+    private final Expression expression;
+
+    public NoMessageIdException(Exchange exchange, Expression expression) {
+        super("No message ID could be found using expression: " + expression + " on message
exchange: " + exchange);
+        this.exchange = exchange;
+        this.expression = expression;
+    }
+
+    /**
+     * The exchange which caused this failure
+     */
+    public Exchange getExchange() {
+        return exchange;
+    }
+
+    /**
+     * The expression which was used
+     */
+    public Expression getExpression() {
+        return expression;
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/NoMessageIdException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html?view=auto&rev=530102
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html
Wed Apr 18 10:45:05 2007
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+An implementation of the <a href="http://activemq.apache.org/camel/idempotent-consumer.html">Idempotent
Consumer</a> pattern.
+
+</body>
+</html>

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExpressionHelper.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExpressionHelper.java?view=auto&rev=530102
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExpressionHelper.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExpressionHelper.java
Wed Apr 18 10:45:05 2007
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+
+/**
+ * A collection of helper methods for working with expressions.
+ *
+ * @version $Revision: 1.1 $
+ */
+public class ExpressionHelper {
+
+    /**
+     * Evaluates the given expression on the exchange as a String value
+     *
+     * @param expression the expression to evaluate
+     * @param exchange the exchange to use to evaluate the expression
+     * @return the result of the evaluation as a string.
+     */
+    public static <E extends Exchange> String evaluateAsString(Expression<E>
expression, E exchange) {
+        return evaluateAsType(expression, exchange, String.class);
+    }
+
+    /**
+     * Evaluates the given expression on the exchange, converting the result to the given
type
+     *
+     * @param expression the expression to evaluate
+     * @param exchange the exchange to use to evaluate the expression
+     * @param resultType the type of the result that is required
+     * @return the result of the evaluation as the specified type.
+     */
+    public static <T, E extends Exchange> T evaluateAsType(Expression<E> expression,
E exchange, Class<T> resultType) {
+        Object value = expression.evaluate(exchange);
+        return exchange.getContext().getTypeConverter().convertTo(resultType, value);
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExpressionHelper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java?view=auto&rev=530102
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java
(added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java
Wed Apr 18 10:45:05 2007
@@ -0,0 +1,24 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class IdempotentConsumerTest {
+}

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/IdempotentConsumerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java?view=diff&rev=530102&r1=530101&r2=530102
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
(original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
Wed Apr 18 10:45:05 2007
@@ -16,9 +16,6 @@
  */
 package org.apache.camel.builder;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -32,28 +29,33 @@
 import org.apache.camel.processor.RecipientList;
 import org.apache.camel.processor.SendProcessor;
 import org.apache.camel.processor.Splitter;
+import org.apache.camel.processor.idempotent.IdempotentConsumer;
+import org.apache.camel.processor.idempotent.MemoryMessageIdRepository;
+import static org.apache.camel.processor.idempotent.MemoryMessageIdRepository.memoryMessageIdRepository;
+
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * @version $Revision$
  */
 public class RouteBuilderTest extends TestSupport {
-	
-	protected Processor<Exchange> myProcessor = new MyProcessor();    	
-	protected InterceptorProcessor<Exchange> interceptor1;
-	protected InterceptorProcessor<Exchange> interceptor2;
-    
-	protected RouteBuilder<Exchange> buildSimpleRoute() {
-		// START SNIPPET: e1
+    protected Processor<Exchange> myProcessor = new MyProcessor();
+    protected InterceptorProcessor<Exchange> interceptor1;
+    protected InterceptorProcessor<Exchange> interceptor2;
+
+    protected RouteBuilder<Exchange> buildSimpleRoute() {
+        // START SNIPPET: e1
         RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
             public void configure() {
                 from("queue:a").to("queue:b");
             }
         };
         // END SNIPPET: e1
-		return builder;
-	}
+        return builder;
+    }
 
-	public void testSimpleRoute() throws Exception {
+    public void testSimpleRoute() throws Exception {
         RouteBuilder<Exchange> builder = buildSimpleRoute();
 
         List<Route<Exchange>> routes = builder.getRouteList();
@@ -69,17 +71,17 @@
     }
 
     protected RouteBuilder<Exchange> buildSimpleRouteWithHeaderPredicate() {
-		// START SNIPPET: e2
+        // START SNIPPET: e2
         RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
             public void configure() {
                 from("queue:a").filter(header("foo").isEqualTo("bar")).to("queue:b");
             }
         };
         // END SNIPPET: e2
-		return builder;
-	}
+        return builder;
+    }
 
-	public void testSimpleRouteWithHeaderPredicate() throws Exception {
+    public void testSimpleRouteWithHeaderPredicate() throws Exception {
         RouteBuilder<Exchange> builder = buildSimpleRouteWithHeaderPredicate();
 
         List<Route<Exchange>> routes = builder.getRouteList();
@@ -92,14 +94,13 @@
             Processor processor = getProcessorWithoutErrorHandler(route);
 
             FilterProcessor filterProcessor = assertIsInstanceOf(FilterProcessor.class, processor);
-    SendProcessor sendProcessor = assertIsInstanceOf(SendProcessor.class, unwrapErrorHandler(filterProcessor.getProcessor()));
+            SendProcessor sendProcessor = assertIsInstanceOf(SendProcessor.class, unwrapErrorHandler(filterProcessor.getProcessor()));
             assertEquals("Endpoint URI", "queue:b", sendProcessor.getDestination().getEndpointUri());
         }
     }
 
-
     protected RouteBuilder<Exchange> buildSimpleRouteWithChoice() {
-		// START SNIPPET: e3
+        // START SNIPPET: e3
         RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
             public void configure() {
                 from("queue:a").choice()
@@ -109,8 +110,8 @@
             }
         };
         // END SNIPPET: e3
-		return builder;
-	}
+        return builder;
+    }
 
     public void testSimpleRouteWithChoice() throws Exception {
         RouteBuilder<Exchange> builder = buildSimpleRouteWithChoice();
@@ -139,7 +140,7 @@
     }
 
     protected RouteBuilder<Exchange> buildCustomProcessor() {
-		// START SNIPPET: e4
+        // START SNIPPET: e4
         myProcessor = new Processor<Exchange>() {
             public void process(Exchange exchange) {
                 System.out.println("Called with exchange: " + exchange);
@@ -152,10 +153,10 @@
             }
         };
         // END SNIPPET: e4
-		return builder;
-	}
+        return builder;
+    }
 
-	public void testCustomProcessor() throws Exception {
+    public void testCustomProcessor() throws Exception {
         RouteBuilder<Exchange> builder = buildCustomProcessor();
 
         List<Route<Exchange>> routes = builder.getRouteList();
@@ -170,19 +171,18 @@
         }
     }
 
-
-	protected RouteBuilder<Exchange> buildCustomProcessorWithFilter() {
-		// START SNIPPET: e5
+    protected RouteBuilder<Exchange> buildCustomProcessorWithFilter() {
+        // START SNIPPET: e5
         RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
             public void configure() {
                 from("queue:a").filter(header("foo").isEqualTo("bar")).process(myProcessor);
             }
         };
         // END SNIPPET: e5
-		return builder;
-	}
+        return builder;
+    }
 
-	public void testCustomProcessorWithFilter() throws Exception {
+    public void testCustomProcessorWithFilter() throws Exception {
         RouteBuilder<Exchange> builder = buildCustomProcessorWithFilter();
 
         List<Route<Exchange>> routes = builder.getRouteList();
@@ -199,17 +199,16 @@
         }
     }
 
-
-	protected RouteBuilder<Exchange> buildWireTap() {
-		// START SNIPPET: e6
+    protected RouteBuilder<Exchange> buildWireTap() {
+        // START SNIPPET: e6
         RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
             public void configure() {
                 from("queue:a").to("queue:tap", "queue:b");
             }
         };
         // END SNIPPET: e6
-		return builder;
-	}
+        return builder;
+    }
 
     public void testWireTap() throws Exception {
         RouteBuilder<Exchange> builder = buildWireTap();
@@ -233,7 +232,7 @@
     }
 
     protected RouteBuilder<Exchange> buildRouteWithInterceptor() {
-		interceptor1 = new InterceptorProcessor<Exchange>() {
+        interceptor1 = new InterceptorProcessor<Exchange>() {
         };
 
         // START SNIPPET: e7        
@@ -242,18 +241,18 @@
         RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
             public void configure() {
                 from("queue:a")
-                    .intercept()
-            		   .add(interceptor1)
-            		   .add(interceptor2)
-            		.target().to("queue:d");
+                        .intercept()
+                        .add(interceptor1)
+                        .add(interceptor2)
+                        .target().to("queue:d");
             }
         };
         // END SNIPPET: e7
-		return builder;
-	}
+        return builder;
+    }
 
     public void testRouteWithInterceptor() throws Exception {
-    	
+
         RouteBuilder<Exchange> builder = buildRouteWithInterceptor();
 
         List<Route<Exchange>> routes = builder.getRouteList();
@@ -274,8 +273,8 @@
         }
     }
 
-	public void testComplexExpressions() throws Exception {
-		// START SNIPPET: e7
+    public void testComplexExpressions() throws Exception {
+        // START SNIPPET: e7
         RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
             public void configure() {
                 from("queue:a").filter(header("foo").isEqualTo(123)).to("queue:b");
@@ -341,6 +340,7 @@
             RecipientList<Exchange> p1 = assertIsInstanceOf(RecipientList.class, processor);
         }
     }
+
     protected RouteBuilder<Exchange> buildSplitter() {
         // START SNIPPET: splitter
         RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
@@ -366,6 +366,41 @@
             Processor processor = getProcessorWithoutErrorHandler(route);
 
             Splitter<Exchange> p1 = assertIsInstanceOf(Splitter.class, processor);
+        }
+    }
+
+    protected RouteBuilder<Exchange> buildIdempotentConsumer() {
+        // START SNIPPET: idempotent
+        RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
+            public void configure() {
+                from("queue:a").idempotentConsumer(header("myMessageId"), memoryMessageIdRepository()).to("queue:b");
+            }
+        };
+        // END SNIPPET: idempotent
+        return builder;
+    }
+
+    public void testIdempotentConsumer() throws Exception {
+
+        RouteBuilder<Exchange> builder = buildIdempotentConsumer();
+
+        List<Route<Exchange>> routes = builder.getRouteList();
+        System.out.println("Created routes: " + routes);
+
+        assertEquals("Number routes created", 1, routes.size());
+        for (Route<Exchange> route : routes) {
+            Endpoint<Exchange> key = route.getEndpoint();
+            assertEquals("From endpoint", "queue:a", key.getEndpointUri());
+            Processor processor = getProcessorWithoutErrorHandler(route);
+
+            IdempotentConsumer<Exchange> idempotentConsumer = assertIsInstanceOf(IdempotentConsumer.class,
processor);
+
+            assertEquals("messageIdExpression", "header(myMessageId)", idempotentConsumer.getMessageIdExpression().toString());
+
+            assertIsInstanceOf(MemoryMessageIdRepository.class, idempotentConsumer.getMessageIdRepository());
+
+            SendProcessor sendProcessor = assertIsInstanceOf(SendProcessor.class, idempotentConsumer.getNextProcessor());
+            assertEquals("Endpoint URI", "queue:b", sendProcessor.getDestination().getEndpointUri());
         }
     }
 



Mime
View raw message