camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject svn commit: r1362585 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/processor/resequencer/ camel-core/src/test/java/org/apache/camel/processor/ components/camel-spring/src/test/resources/org/apache/camel/spring/processor/
Date Tue, 17 Jul 2012 17:47:19 GMT
Author: boday
Date: Tue Jul 17 17:47:19 2012
New Revision: 1362585

URL: http://svn.apache.org/viewvc?rev=1362585&view=rev
Log:
CAMEL-4327 fixed randomly failing resequencer unit tests by using capacity instead of timeout,
tweaked the rejectOld validation logic to short-circuit earlier

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java?rev=1362585&r1=1362584&r2=1362585&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
Tue Jul 17 17:47:19 2012
@@ -206,6 +206,12 @@ public class ResequencerEngine<E> {
             throw new IllegalArgumentException("Element cannot be used in comparator: " +
sequence.comparator());
         }
 
+        // validate the exchange shouldn't be 'rejected' (if applicable)
+        if (rejectOld != null && rejectOld.booleanValue() && beforeLastDelivered(element))
{
+            throw new MessageRejectedException("rejecting message [" + element.getObject()
+                    + "], it should have been sent before the last delivered message [" +
lastDelivered.getObject() + "]");
+        }
+
         // add element to sequence in proper order
         sequence.add(element);
 
@@ -222,10 +228,6 @@ public class ResequencerEngine<E> {
             // nothing to schedule
         } else if (sequence.predecessor(element) != null) {
             // nothing to schedule
-        } else if (rejectOld != null && rejectOld.booleanValue() && beforeLastDelivered(element))
{
-            sequence.remove(element);
-            throw new MessageRejectedException("rejecting message [" + element.getObject()
-                    + "], it should have been sent before the last delivered message [" +
lastDelivered.getObject() + "]");
         } else {
             element.schedule(defineTimeout());
         }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java?rev=1362585&r1=1362584&r2=1362585&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
Tue Jul 17 17:47:19 2012
@@ -25,54 +25,52 @@ import org.apache.camel.processor.resequ
  */
 public class ResequenceStreamRejectOldExchangesTest extends ContextTestSupport {
 
-    public void testInSequenceAfterTimeout() throws Exception {
+    public void testInSequenceAfterCapacityReached() throws Exception {
         getMockEndpoint("mock:result").expectedBodiesReceived("A", "B", "C", "E");
         getMockEndpoint("mock:error").expectedMessageCount(0);
 
         template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
         template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
         template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
-        Thread.sleep(1100);
         template.sendBodyAndHeader("direct:start", "E", "seqno", 5);
 
         assertMockEndpointsSatisfied();
     }
 
-    public void testDuplicateAfterTimeout() throws Exception {
+    public void testDuplicateAfterCapacityReached() throws Exception {
         getMockEndpoint("mock:result").expectedBodiesReceived("A", "B", "C");
         getMockEndpoint("mock:error").expectedMessageCount(0);
 
         template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
         template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
         template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
-        Thread.sleep(1100);
         template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
 
         assertMockEndpointsSatisfied();
     }
 
-    public void testOutOfSequenceAfterTimeoutSimple() throws Exception {
+    public void testOutOfSequenceAfterCapacityReachedSimple() throws Exception {
         getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D");
         getMockEndpoint("mock:error").expectedBodiesReceived("A");
 
         template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
         template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
         template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
-        Thread.sleep(1100);
         template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
 
         assertMockEndpointsSatisfied();
     }
 
-    public void testOutOfSequenceAfterTimeoutComplex() throws Exception {
+    
+    public void testOutOfSequenceAfterCapacityReachedComplex() throws Exception {
         getMockEndpoint("mock:result").expectedBodiesReceived("A", "D", "E", "F");
         getMockEndpoint("mock:error").expectedBodiesReceived("B", "C");
 
+        template.sendBodyAndHeader("direct:start", "E", "seqno", 5);
         template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
         template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
-        Thread.sleep(1100);
+
         template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
-        template.sendBodyAndHeader("direct:start", "E", "seqno", 5);
         template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
         template.sendBodyAndHeader("direct:start", "F", "seqno", 6);
 
@@ -86,9 +84,9 @@ public class ResequenceStreamRejectOldEx
             public void configure() throws Exception {
 
                 from("direct:start")
-                        .onException(MessageRejectedException.class).handled(true).to("mock:error").end()
-                        .resequence(header("seqno")).stream().timeout(1000).rejectOld()
-                        .to("mock:result");
+                    .onException(MessageRejectedException.class).maximumRedeliveries(0).handled(true).to("mock:error").end()
+                    .resequence(header("seqno")).stream().capacity(3).rejectOld()
+                    .to("mock:result");
             }
         };
     }

Modified: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml?rev=1362585&r1=1362584&r2=1362585&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
(original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
Tue Jul 17 17:47:19 2012
@@ -30,7 +30,7 @@
                 <to uri="mock:error"/>
             </onException>
             <resequence>
-                <stream-config capacity="100" timeout="1000">
+                <stream-config capacity="3" timeout="1000">
                     <rejectOld>true</rejectOld>
                 </stream-config>
                 <header>seqno</header>



Mime
View raw message