camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1475742 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/component/browse/ main/java/org/apache/camel/component/dataset/ main/java/org/apache/camel/component/direct/ main/java/org/apache/camel/component/directvm/ main/java/org/...
Date Thu, 25 Apr 2013 13:03:36 GMT
Author: davsclaus
Date: Thu Apr 25 13:03:35 2013
New Revision: 1475742

URL: http://svn.apache.org/r1475742
Log:
CAMEL-6312: When endpoints create Consumer make sure to configure consume as well so we can
use the consumer. prefix from uris.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomConsumerExceptionHandlerTest.java
      - copied, changed from r1475700, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/browse/BrowseEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProcessorEndpoint.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/browse/BrowseEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/browse/BrowseEndpoint.java?rev=1475742&r1=1475741&r2=1475742&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/browse/BrowseEndpoint.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/browse/BrowseEndpoint.java
Thu Apr 25 13:03:35 2013
@@ -69,7 +69,9 @@ public class BrowseEndpoint extends Defa
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
-        return new LoadBalancerConsumer(this, processor, loadBalancer);
+        Consumer answer = new LoadBalancerConsumer(this, processor, loadBalancer);
+        configureConsumer(answer);
+        return answer;
     }
 
     protected List<Exchange> createExchangeList() {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java?rev=1475742&r1=1475741&r2=1475742&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
Thu Apr 25 13:03:35 2013
@@ -71,7 +71,9 @@ public class DataSetEndpoint extends Moc
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        return new DataSetConsumer(this, processor);
+        Consumer answer = new DataSetConsumer(this, processor);
+        configureConsumer(answer);
+        return answer;
     }
 
     @Override

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java?rev=1475742&r1=1475741&r2=1475742&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
Thu Apr 25 13:03:35 2013
@@ -53,7 +53,9 @@ public class DirectEndpoint extends Defa
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
-        return new DirectConsumer(this, processor);
+        Consumer answer = new DirectConsumer(this, processor);
+        configureConsumer(answer);
+        return answer;
     }
 
     public boolean isSingleton() {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java?rev=1475742&r1=1475741&r2=1475742&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java
Thu Apr 25 13:03:35 2013
@@ -42,7 +42,9 @@ public class DirectVmEndpoint extends De
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        return new DirectVmConsumer(this, new DirectVmProcessor(processor, this));
+        Consumer answer = new DirectVmConsumer(this, new DirectVmProcessor(processor, this));
+        configureConsumer(answer);
+        return answer;
     }
 
     @Override

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1475742&r1=1475741&r2=1475742&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
Thu Apr 25 13:03:35 2013
@@ -101,7 +101,9 @@ public class SedaEndpoint extends Defaul
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
-        return new SedaConsumer(this, processor);
+        Consumer answer = new SedaConsumer(this, processor);
+        configureConsumer(answer);
+        return answer;
     }
 
     public synchronized BlockingQueue<Exchange> getQueue() {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java?rev=1475742&r1=1475741&r2=1475742&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
Thu Apr 25 13:03:35 2013
@@ -69,7 +69,9 @@ public class TimerEndpoint extends Defau
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
-        return new TimerConsumer(this, processor);
+        Consumer answer = new TimerConsumer(this, processor);
+        configureConsumer(answer);
+        return answer;
     }
 
     @Override

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?rev=1475742&r1=1475741&r2=1475742&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Thu Apr
25 13:03:35 2013
@@ -206,6 +206,7 @@ public abstract class DefaultEndpoint ex
     }
 
     public PollingConsumer createPollingConsumer() throws Exception {
+        // should not configure consumer
         return new EventDrivenPollingConsumer(this);
     }
 
@@ -259,7 +260,7 @@ public abstract class DefaultEndpoint ex
 
     public void configureProperties(Map<String, Object> options) {
         Map<String, Object> consumerProperties = IntrospectionSupport.extractProperties(options,
"consumer.");
-        if (consumerProperties != null) {
+        if (consumerProperties != null && !consumerProperties.isEmpty()) {
             setConsumerProperties(consumerProperties);
         }
     }
@@ -317,11 +318,22 @@ public abstract class DefaultEndpoint ex
     }
 
     public Map<String, Object> getConsumerProperties() {
+        if (consumerProperties == null) {
+            // must create empty if none exists
+            consumerProperties = new HashMap<String, Object>();
+        }
         return consumerProperties;
     }
 
     public void setConsumerProperties(Map<String, Object> consumerProperties) {
-        this.consumerProperties = consumerProperties;
+        // append consumer properties
+        if (consumerProperties != null && !consumerProperties.isEmpty()) {
+            if (this.consumerProperties == null) {
+                this.consumerProperties = new HashMap<String, Object>(consumerProperties);
+            } else {
+                this.consumerProperties.putAll(consumerProperties);
+            }
+        }
     }
 
     protected void configureConsumer(Consumer consumer) throws Exception {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingEndpoint.java?rev=1475742&r1=1475741&r2=1475742&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingEndpoint.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingEndpoint.java
Thu Apr 25 13:03:35 2013
@@ -46,7 +46,7 @@ public abstract class DefaultPollingEndp
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
-        DefaultScheduledPollConsumer result = new DefaultScheduledPollConsumer(this, processor);
+        Consumer result = new DefaultScheduledPollConsumer(this, processor);
         configureConsumer(result);
         return result;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProcessorEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProcessorEndpoint.java?rev=1475742&r1=1475741&r2=1475742&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProcessorEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProcessorEndpoint.java Thu
Apr 25 13:03:35 2013
@@ -75,7 +75,9 @@ public class ProcessorEndpoint extends D
 
     @Override
     public PollingConsumer createPollingConsumer() throws Exception {
-        return new ProcessorPollingConsumer(this, getProcessor());
+        PollingConsumer answer = new ProcessorPollingConsumer(this, getProcessor());
+        configureConsumer(answer);
+        return answer;
     }
 
     public Processor getProcessor() throws Exception {

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomConsumerExceptionHandlerTest.java
(from r1475700, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomConsumerExceptionHandlerTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomConsumerExceptionHandlerTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java&r1=1475700&r2=1475742&rev=1475742&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomConsumerExceptionHandlerTest.java
Thu Apr 25 13:03:35 2013
@@ -16,33 +16,36 @@
  */
 package org.apache.camel.processor;
 
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.seda.SedaConsumer;
+import org.apache.camel.impl.JndiRegistry;
 import org.apache.camel.spi.ExceptionHandler;
 
-public class DeadLetterChannelAlwaysHandledTest extends ContextTestSupport {
+public class CustomConsumerExceptionHandlerTest extends ContextTestSupport {
 
-    private static final AtomicBoolean called = new AtomicBoolean();
+    private static final CountDownLatch LATCH = new CountDownLatch(1);
 
-    public void testDeadLetterChannelAlwaysHandled() throws Exception {
-        // need to set exception handler manually to work around an issue configuring from
uri
-        SedaConsumer seda = (SedaConsumer) context.getRoute("foo").getConsumer();
-        seda.setExceptionHandler(new MyExceptionHandler());
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("myHandler", new MyExceptionHandler());
+        return jndi;
+    }
 
+    public void testDeadLetterChannelAlwaysHandled() throws Exception {
         getMockEndpoint("mock:foo").expectedMessageCount(1);
         getMockEndpoint("mock:bar").expectedMessageCount(1);
-        getMockEndpoint("mock:dead").expectedMessageCount(1);
         getMockEndpoint("mock:result").expectedMessageCount(0);
 
         template.sendBody("seda:foo", "Hello World");
 
         assertMockEndpointsSatisfied();
 
-        assertFalse("Should not have called", called.get());
+        assertTrue("Should have been called", LATCH.await(5, TimeUnit.SECONDS));
     }
 
     @Override
@@ -50,9 +53,7 @@ public class DeadLetterChannelAlwaysHand
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                errorHandler(deadLetterChannel("mock:dead"));
-
-                from("seda:foo?synchronous=true").routeId("foo")
+                from("seda:foo?synchronous=true&consumer.exceptionHandler=#myHandler").routeId("foo")
                     .to("mock:foo")
                     .to("direct:bar")
                     .to("mock:result");
@@ -69,17 +70,17 @@ public class DeadLetterChannelAlwaysHand
 
         @Override
         public void handleException(Throwable exception) {
-            called.set(true);
+            LATCH.countDown();
         }
 
         @Override
         public void handleException(String message, Throwable exception) {
-            called.set(true);
+            LATCH.countDown();
         }
 
         @Override
         public void handleException(String message, Exchange exchange, Throwable exception)
{
-            called.set(true);
+            LATCH.countDown();
         }
     }
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java?rev=1475742&r1=1475741&r2=1475742&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelAlwaysHandledTest.java
Thu Apr 25 13:03:35 2013
@@ -21,18 +21,21 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.seda.SedaConsumer;
+import org.apache.camel.impl.JndiRegistry;
 import org.apache.camel.spi.ExceptionHandler;
 
 public class DeadLetterChannelAlwaysHandledTest extends ContextTestSupport {
 
-    private static final AtomicBoolean called = new AtomicBoolean();
+    private static final AtomicBoolean CALLLED = new AtomicBoolean();
 
-    public void testDeadLetterChannelAlwaysHandled() throws Exception {
-        // need to set exception handler manually to work around an issue configuring from
uri
-        SedaConsumer seda = (SedaConsumer) context.getRoute("foo").getConsumer();
-        seda.setExceptionHandler(new MyExceptionHandler());
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("myHandler", new MyExceptionHandler());
+        return jndi;
+    }
 
+    public void testDeadLetterChannelAlwaysHandled() throws Exception {
         getMockEndpoint("mock:foo").expectedMessageCount(1);
         getMockEndpoint("mock:bar").expectedMessageCount(1);
         getMockEndpoint("mock:dead").expectedMessageCount(1);
@@ -42,7 +45,7 @@ public class DeadLetterChannelAlwaysHand
 
         assertMockEndpointsSatisfied();
 
-        assertFalse("Should not have called", called.get());
+        assertFalse("Should not have called", CALLLED.get());
     }
 
     @Override
@@ -52,7 +55,7 @@ public class DeadLetterChannelAlwaysHand
             public void configure() throws Exception {
                 errorHandler(deadLetterChannel("mock:dead"));
 
-                from("seda:foo?synchronous=true").routeId("foo")
+                from("seda:foo?synchronous=true&consumer.exceptionHandler=#myHandler").routeId("foo")
                     .to("mock:foo")
                     .to("direct:bar")
                     .to("mock:result");
@@ -69,17 +72,17 @@ public class DeadLetterChannelAlwaysHand
 
         @Override
         public void handleException(Throwable exception) {
-            called.set(true);
+            CALLLED.set(true);
         }
 
         @Override
         public void handleException(String message, Throwable exception) {
-            called.set(true);
+            CALLLED.set(true);
         }
 
         @Override
         public void handleException(String message, Exchange exchange, Throwable exception)
{
-            called.set(true);
+            CALLLED.set(true);
         }
     }
 }



Mime
View raw message