activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r530858 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/component/mock/ main/java/org/apache/camel/processor/ test/java/org/apache/camel/processor/
Date Fri, 20 Apr 2007 17:16:52 GMT
Author: jstrachan
Date: Fri Apr 20 10:16:51 2007
New Revision: 530858

URL: http://svn.apache.org/viewvc?view=rev&rev=530858
Log:
added a test case for the DeadLetterChannel along with the start of an improved jmock style
API on the MockEndpoint

Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/AssertionClause.java
  (with props)
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
  (with props)
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/AssertionClause.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/AssertionClause.java?view=auto&rev=530858
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/AssertionClause.java
(added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/AssertionClause.java
Fri Apr 20 10:16:51 2007
@@ -0,0 +1,89 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mock;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+import static org.apache.camel.builder.ExpressionBuilder.bodyExpression;
+import static org.apache.camel.builder.ExpressionBuilder.headerExpression;
+import org.apache.camel.builder.Fluent;
+import org.apache.camel.builder.FluentArg;
+import org.apache.camel.builder.ValueBuilder;
+
+/**
+ * A builder of assertions on message exchanges
+ *
+ * @version $Revision: 1.1 $
+ */
+public abstract class AssertionClause<E extends Exchange> implements Runnable {
+
+    // Builder methods
+    //-------------------------------------------------------------------------
+
+    /**
+     * Returns a predicate and value builder for headers on an exchange
+     */
+    @Fluent
+    public ValueBuilder<E> header(@FluentArg("name") String name) {
+        Expression<E> expression = headerExpression(name);
+        return new ValueBuilder<E>(expression);
+    }
+
+    /**
+     * Returns a predicate and value builder for the inbound body on an exchange
+     */
+    @Fluent
+    public ValueBuilder<E> body() {
+        Expression<E> expression = bodyExpression();
+        return new ValueBuilder<E>(expression);
+    }
+
+    /**
+     * Returns a predicate and value builder for the inbound message body as a specific type
+     */
+    @Fluent
+    public <T> ValueBuilder<E> bodyAs(@FluentArg("class") Class<T> type)
{
+        Expression<E> expression = bodyExpression(type);
+        return new ValueBuilder<E>(expression);
+    }
+
+    /**
+     * Returns a predicate and value builder for the outbound body on an exchange
+     */
+    @Fluent
+    public ValueBuilder<E> outBody() {
+        Expression<E> expression = bodyExpression();
+        return new ValueBuilder<E>(expression);
+    }
+
+    /**
+     * Returns a predicate and value builder for the outbound message body as a specific
type
+     */
+    @Fluent
+    public <T> ValueBuilder<E> outBody(@FluentArg("class") Class<T> type)
{
+        Expression<E> expression = bodyExpression(type);
+        return new ValueBuilder<E>(expression);
+    }
+
+    /**
+     * Performs any assertions on the given exchange
+     */
+    protected void applyAssertionOn(int index, Exchange exchange) {
+        // TODO perform the predicate on the given exchange 
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/AssertionClause.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java?view=diff&rev=530858&r1=530857&r2=530858
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
Fri Apr 20 10:16:51 2007
@@ -128,7 +128,7 @@
 
         if (expectedCount >= 0) {
             int receivedCounter = getReceivedCounter();
-            assertEquals("Expected message count", expectedCount, receivedCounter);
+            assertEquals("Received message count" , expectedCount, receivedCounter);
         }
 
         for (Runnable test : tests) {
@@ -137,8 +137,8 @@
 
         for (Throwable failure : failures) {
            if (failure != null) {
-               log.error("Caught: " + failure, failure);
-               throw new AssertionError("Failed due to caught exception: " + failure);
+               log.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure);
+               fail("Failed due to caught exception: " + failure);
            }
         }
     }
@@ -199,7 +199,51 @@
     public void expects(Runnable runnable) {
         tests.add(runnable);
     }
-    
+
+    /**
+     * Adds an assertion to the given message index
+     *
+     * @param messageIndex the number of the message
+     * @return the assertion clause
+     */
+    public AssertionClause message(final int messageIndex) {
+        AssertionClause clause = new AssertionClause() {
+            public void run() {
+                applyAssertionOn(messageIndex, assertExchangeReceived(messageIndex));
+            }
+        };
+        expects(clause);
+        return clause;
+    }
+
+    /**
+     * Adds an assertion to all the received messages
+     *
+     * @param messageIndex the number of the message
+     * @return the assertion clause
+     */
+    public AssertionClause allMessages() {
+        AssertionClause clause = new AssertionClause() {
+            public void run() {
+                List<Exchange> list = getExchangesReceived();
+                int index = 0;
+                for (Exchange exchange : list) {
+                    applyAssertionOn(index++, exchange);
+                }
+            }
+        };
+        expects(clause);
+        return clause;
+    }
+
+    /**
+     * Asserts that the given index of message is received (starting at zero)
+     */
+    public Exchange assertExchangeReceived(int index) {
+        int count = getReceivedCounter();
+        assertTrue("Not enough messages received. Was: " + count, count > index);
+        return getExchangesReceived().get(count);
+    }
 
     // Properties
     //-------------------------------------------------------------------------
@@ -257,13 +301,17 @@
 
     protected void assertEquals(String message, Object expectedValue, Object actualValue)
{
         if (!ObjectHelper.equals(expectedValue, actualValue)) {
-            throw new AssertionError(message + ". Expected: <" + expectedValue + ">
but was: <" + actualValue + ">");
+            fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue
+ ">");
         }
     }
 
     protected void assertTrue(String message, boolean predicate) {
         if (!predicate) {
-            throw new AssertionError(message);
+            fail(message);
         }
+    }
+
+    protected void fail(Object message) {
+        throw new AssertionError(getEndpointUri() + " " + message);
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?view=diff&rev=530858&r1=530857&r2=530858
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
Fri Apr 20 10:16:51 2007
@@ -32,12 +32,13 @@
  * @version $Revision$
  */
 public class DeadLetterChannel<E extends Exchange> extends ServiceSupport implements
ErrorHandler<E> {
-    public static final String REDELIVERY_COUNT_HEADER = "org.apache.camel.redeliveryCount";
+    public static final String REDELIVERY_COUNTER = "org.apache.camel.RedeliveryCounter";
+    public static final String REDELIVERED = "org.apache.camel.Redelivered";
+
     private static final transient Log log = LogFactory.getLog(DeadLetterChannel.class);
     private Processor<E> output;
     private Processor<E> deadLetter;
     private RedeliveryPolicy redeliveryPolicy;
-    private String redeliveryCountHeader = REDELIVERY_COUNT_HEADER;
 
     public DeadLetterChannel(Processor<E> output, Processor<E> deadLetter) {
         this(output, deadLetter, new RedeliveryPolicy());
@@ -59,11 +60,10 @@
         long redeliveryDelay = 0;
 
         do {
-            if (redeliveryCounter++ > 0) {
+            if (redeliveryCounter > 0) {
                 // Figure out how long we should wait to resend this message.
                 redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
                 sleep(redeliveryDelay);
-                appendRedeliveryHeaders(exchange, redeliveryCounter);
             }
 
             try {
@@ -73,6 +73,7 @@
             catch (RuntimeException e) {
                 log.error("On delivery attempt: " + redeliveryCounter + " caught: " + e,
e);
             }
+            redeliveryCounter = incrementRedeliveryCounter(exchange);
         }
         while (redeliveryPolicy.shouldRedeliver(redeliveryCounter));
 
@@ -108,27 +109,24 @@
         this.redeliveryPolicy = redeliveryPolicy;
     }
 
-    public String getRedeliveryCountHeader() {
-        return redeliveryCountHeader;
-    }
-
-    /**
-     * Sets the message header name to be used to append the redelivery count value when
a message has been redelivered
-     *
-     * @param redeliveryCountHeader the header name to use to append the redelivery count
or null if you wish to disable
-     *                              this feature
-     */
-    public void setRedeliveryCountHeader(String redeliveryCountHeader) {
-        this.redeliveryCountHeader = redeliveryCountHeader;
-    }
 
     // Implementation methods
     //-------------------------------------------------------------------------
-    protected void appendRedeliveryHeaders(E exchange, int redeliveryCounter) {
-        String header = getRedeliveryCountHeader();
-        if (header != null) {
-            exchange.getIn().setHeader(header, redeliveryCounter);
+
+    /**
+     * Increments the redelivery counter and adds the redelivered flag if the message has
been redelivered
+     */
+    protected int incrementRedeliveryCounter(E exchange) {
+        Integer counter = exchange.getProperty(REDELIVERY_COUNTER, Integer.class);
+        int next = 1;
+        if (counter != null) {
+            next = counter + 1;
+        }
+        exchange.setProperty(REDELIVERY_COUNTER, next);
+        if (next > 1) {
+            exchange.setProperty(REDELIVERED, true);
         }
+        return next;
     }
 
     protected void sleep(long redeliveryDelay) {

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java?view=diff&rev=530858&r1=530857&r2=530858
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
Fri Apr 20 10:16:51 2007
@@ -59,7 +59,7 @@
      * Returns true if the policy decides that the message exchange should be redelivered
      */
     public boolean shouldRedeliver(int redeliveryCounter) {
-        return redeliveryCounter <= getMaximumRedeliveries();
+        return redeliveryCounter < getMaximumRedeliveries();
     }
 
     // Builder methods

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java?view=auto&rev=530858
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
(added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
Fri Apr 20 10:16:51 2007
@@ -0,0 +1,92 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import static org.apache.camel.component.mock.MockEndpoint.assertIsSatisfied;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class DeadLetterChannelTest extends ContextTestSupport {
+    protected Endpoint<Exchange> startEndpoint;
+    protected MockEndpoint deadEndpoint, successEndpoint;
+    protected int failUntilAttempt = 2;
+    protected String body = "<hello>world!</hello>";
+
+    public void testFirstFewAttemptsFail() throws Exception {
+        successEndpoint.expectedBodiesReceived(body);
+        deadEndpoint.expectedMessageCount(0);
+
+        send("direct:start", body);
+
+        assertIsSatisfied(deadEndpoint, successEndpoint);
+    }
+
+    public void testLotsOfAttemptsFail() throws Exception {
+        failUntilAttempt = 5;
+
+        deadEndpoint.expectedBodiesReceived(body);
+        successEndpoint.expectedMessageCount(0);
+
+        send("direct:start", body);
+
+        assertIsSatisfied(deadEndpoint, successEndpoint);
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        deadEndpoint = (MockEndpoint) resolveMandatoryEndpoint("mock:failed");
+        successEndpoint = (MockEndpoint) resolveMandatoryEndpoint("mock:success");
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        final Processor<Exchange> processor = new Processor<Exchange>() {
+            public void process(Exchange exchange) {
+                Integer counter = exchange.getProperty(DeadLetterChannel.REDELIVERY_COUNTER,
Integer.class);
+                int attempt = (counter == null) ? 1 : counter + 1;
+                if (attempt < failUntilAttempt) {
+                    throw new RuntimeException("Failed to process due to attempt: " + attempt
+ " being less than: " + failUntilAttempt);
+                }
+                else {
+                    client.send("mock:success", exchange);
+                }
+            }
+        };
+
+        return new RouteBuilder<Exchange>() {
+            public void configure() {
+                from("direct:start").errorHandler(deadLetterChannel("mock:failed").maximumRedeliveries(2).initialRedeliveryDelay(1))
+                        .process(processor);
+
+                /*  TODO - currently process().to() results in two separate operations which
have their own error handler
+
+                        .to("mock:success");
+                 */
+            }
+        };
+    }
+
+}

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message