camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [1/2] camel git commit: CAMEL-9791: Threads EIP now allow error handler to perform redelivery if adding task to queue is rejected. Thanks to Thibaut Robert for the unit test.
Date Wed, 20 Apr 2016 06:16:50 GMT
Repository: camel
Updated Branches:
  refs/heads/camel-2.17.x e4f4ed38f -> 194984784


CAMEL-9791: Threads EIP now allow error handler to perform redelivery if adding task to queue
is rejected. Thanks to Thibaut Robert for the unit test.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5f442d8b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5f442d8b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5f442d8b

Branch: refs/heads/camel-2.17.x
Commit: 5f442d8bf03ab2015ce56b8e05a9fea4a1b37e31
Parents: e4f4ed3
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Mon Apr 4 16:55:54 2016 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Wed Apr 20 08:16:23 2016 +0200

----------------------------------------------------------------------
 .../camel/processor/ThreadsProcessor.java       |  4 -
 ...eadsRejectedExecutionWithDeadLetterTest.java | 91 ++++++++++++++++++++
 2 files changed, 91 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5f442d8b/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
index c71821f..2f97943 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
@@ -92,11 +92,7 @@ public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor,
             if (abort) {
                 exchange.setException(new RejectedExecutionException());
             }
-
             LOG.trace("{} routing exchange {} ", abort ? "Aborted" : "Rejected", exchange);
-            // we should not continue routing, and no redelivery should be performed
-            exchange.setProperty(Exchange.ROUTE_STOP, true);
-            exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, true);
 
             if (shutdown.get()) {
                 exchange.setException(new RejectedExecutionException("ThreadsProcessor is
not running."));

http://git-wip-us.apache.org/repos/asf/camel/blob/5f442d8b/camel-core/src/test/java/org/apache/camel/issues/ThreadsRejectedExecutionWithDeadLetterTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/issues/ThreadsRejectedExecutionWithDeadLetterTest.java
b/camel-core/src/test/java/org/apache/camel/issues/ThreadsRejectedExecutionWithDeadLetterTest.java
new file mode 100644
index 0000000..9f32e5f
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/issues/ThreadsRejectedExecutionWithDeadLetterTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ThreadPoolRejectedPolicy;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version
+ */
+public class ThreadsRejectedExecutionWithDeadLetterTest extends ContextTestSupport {
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    public void testThreadsRejectedExecution() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:start").errorHandler(deadLetterChannel("mock:failed"))
+                        .to("log:before")
+                        // will use our custom pool
+                        .threads()
+                        .maxPoolSize(1).poolSize(1) // 1 thread max
+                        .maxQueueSize(1)            // 1 queued task
+                        //(Test fails whatever the chosen policy below)
+                        .rejectedPolicy(ThreadPoolRejectedPolicy.Abort)
+                        .delay(1000)
+                        .to("log:after")
+                        .to("mock:result");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(2);
+        getMockEndpoint("mock:failed").expectedMessageCount(1);
+
+        template.sendBody("seda:start", "Hello World"); // will block
+        template.sendBody("seda:start", "Hi World");    // will be queued
+        template.sendBody("seda:start", "Bye World");   // will be rejected
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testThreadsRejectedExecutionWithRedelivery() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:start").errorHandler(deadLetterChannel("mock:failed").maximumRedeliveries(5))
+                        .to("log:before")
+                        // will use our custom pool
+                        .threads()
+                        .maxPoolSize(1).poolSize(1) // 1 thread max
+                        .maxQueueSize(1)            // 1 queued task
+                        //(Test fails whatever the chosen policy below)
+                        .rejectedPolicy(ThreadPoolRejectedPolicy.Abort)
+                        .delay(1000)
+                        .to("log:after")
+                        .to("mock:result");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:result").expectedMessageCount(3);
+        getMockEndpoint("mock:failed").expectedMessageCount(0);
+
+        template.sendBody("seda:start", "Hello World"); // will block
+        template.sendBody("seda:start", "Hi World");    // will be queued
+        template.sendBody("seda:start", "Bye World");   // will be rejected and queued on
redelivery later
+
+        assertMockEndpointsSatisfied();
+    }
+
+}
\ No newline at end of file


Mime
View raw message