camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1232782 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/component/seda/SedaConsumer.java test/java/org/apache/camel/management/ManagedRouteAddFromRouteTest.java
Date Wed, 18 Jan 2012 08:18:08 GMT
Author: davsclaus
Date: Wed Jan 18 08:18:08 2012
New Revision: 1232782

URL: http://svn.apache.org/viewvc?rev=1232782&view=rev
Log:
CAMEL-4911: SedaConsumer should not poll if CamelContext is starting.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddFromRouteTest.java
      - copied, changed from r1232750, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=1232782&r1=1232781&r2=1232782&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
Wed Jan 18 08:18:08 2012
@@ -138,6 +138,18 @@ public class SedaConsumer extends Servic
         BlockingQueue<Exchange> queue = endpoint.getQueue();
         // loop while we are allowed, or if we are stopping loop until the queue is empty
         while (queue != null && (isRunAllowed())) {
+
+            // do not poll during CamelContext is starting, as we should only poll when CamelContext
is fully started
+            if (getEndpoint().getCamelContext().getStatus().isStarting()) {
+                LOG.trace("CamelContext is starting so skip polling");
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped());
+                }
+                continue;
+            }
+
             // do not poll if we are suspended
             if (isSuspending() || isSuspended()) {
                 LOG.trace("Consumer is suspended so skip polling");

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddFromRouteTest.java
(from r1232750, camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddFromRouteTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddFromRouteTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java&r1=1232750&r2=1232782&rev=1232782&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddFromRouteTest.java
Wed Jan 18 08:18:08 2012
@@ -16,409 +16,69 @@
  */
 package org.apache.camel.management;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.ServiceStatus;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.seda.SedaEndpoint;
+import org.apache.camel.impl.DefaultExchange;
 
 /**
- * Tests mbeans is registered when adding a 2nd route after CamelContext has been started.
+ * Tests mbeans is registered when adding a 2nd route from within an existing route.
  *
- * @version 
+ * @version
  */
-public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
+public class ManagedRouteAddFromRouteTest extends ManagementTestSupport {
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:start").routeId("foo").to("mock:result");
-            }
-        };
-    }
-
-    public void testRouteAddRemoteRouteWithTo() throws Exception {
-        MockEndpoint result = getMockEndpoint("mock:result");
-        result.expectedMessageCount(1);
-        template.sendBody("direct:start", "Hello World");
-        result.assertIsSatisfied();
-
-        MBeanServer mbeanServer = getMBeanServer();
-        ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
-
-        // number of producer caches
-        Set<ObjectName> names = mbeanServer.queryNames(on, null);
-        assertEquals(1, names.size());
-        
-        log.info("Adding 2nd route");
-
-        // add a 2nd route
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("direct:bar").routeId("bar").to("mock:bar");
-            }
-        });
-
-        // and send a message to it
-        MockEndpoint bar = getMockEndpoint("mock:bar");
-        bar.expectedMessageCount(1);
-        template.sendBody("direct:bar", "Hello World");
-        bar.assertIsSatisfied();
-
-        // there should be one more producer cache
-        names = mbeanServer.queryNames(on, null);
-        assertEquals(2, names.size());
-
-        log.info("Removing 2nd route");
-
-        // now remove the 2nd route
-        context.stopRoute("bar");
-        boolean removed = context.removeRoute("bar");
-        assertTrue(removed);
-
-        // the producer cache should have been removed
-        on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
-        names = mbeanServer.queryNames(on, null);
-        assertEquals(1, names.size());
-
-        log.info("Shutting down...");
-    }
-
-    public void testRouteAddRemoteRouteWithRecipientList() throws Exception {
-        MockEndpoint result = getMockEndpoint("mock:result");
-        result.expectedMessageCount(1);
-        template.sendBody("direct:start", "Hello World");
-        result.assertIsSatisfied();
-
-        MBeanServer mbeanServer = getMBeanServer();
-        ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
-
-        // number of producer caches
-        Set<ObjectName> names = mbeanServer.queryNames(on, null);
-        assertEquals(1, names.size());
-        
-        log.info("Adding 2nd route");
-
-        // add a 2nd route
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("direct:bar").routeId("bar").recipientList(header("bar"));
-            }
-        });
-
-        // and send a message to it
-        MockEndpoint bar = getMockEndpoint("mock:bar");
-        bar.expectedMessageCount(1);
-        template.sendBodyAndHeader("direct:bar", "Hello World", "bar", "mock:bar");
-        bar.assertIsSatisfied();
-
-        // there should be one more producer cache
-        names = mbeanServer.queryNames(on, null);
-        assertEquals(2, names.size());
-
-        log.info("Removing 2nd route");
-
-        // now remove the 2nd route
-        context.stopRoute("bar");
-        boolean removed = context.removeRoute("bar");
-        assertTrue(removed);
-
-        // the producer cache should have been removed
-        on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
-        names = mbeanServer.queryNames(on, null);
-        assertEquals(1, names.size());
-
-        log.info("Shutting down...");
-    }
-
-    public void testRouteAddRemoteRouteWithRoutingSlip() throws Exception {
-        MockEndpoint result = getMockEndpoint("mock:result");
-        result.expectedMessageCount(1);
-        template.sendBody("direct:start", "Hello World");
-        result.assertIsSatisfied();
-
-        MBeanServer mbeanServer = getMBeanServer();
-        ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
-
-        // number of producer caches
-        Set<ObjectName> names = mbeanServer.queryNames(on, null);
-        assertEquals(1, names.size());
-
-        log.info("Adding 2nd route");
-
-        // add a 2nd route
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("direct:bar").routeId("bar").routingSlip(header("bar"));
-            }
-        });
-
-        // and send a message to it
-        MockEndpoint bar = getMockEndpoint("mock:bar");
-        bar.expectedMessageCount(1);
-        template.sendBodyAndHeader("direct:bar", "Hello World", "bar", "mock:bar");
-        bar.assertIsSatisfied();
-
-        // there should be one more producer cache
-        names = mbeanServer.queryNames(on, null);
-        assertEquals(2, names.size());
-
-        log.info("Removing 2nd route");
-
-        // now remove the 2nd route
-        context.stopRoute("bar");
-        boolean removed = context.removeRoute("bar");
-        assertTrue(removed);
-
-        // the producer cache should have been removed
-        on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
-        names = mbeanServer.queryNames(on, null);
-        assertEquals(1, names.size());
-
-        log.info("Shutting down...");
-    }
-
-    public void testRouteAddRemoteRouteWithRecipientListAndRouteScopedOnException() throws
Exception {
-        MockEndpoint result = getMockEndpoint("mock:result");
-        result.expectedMessageCount(1);
-        template.sendBody("direct:start", "Hello World");
-        result.assertIsSatisfied();
-
-        MBeanServer mbeanServer = getMBeanServer();
-        ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
-
-        // number of producer caches
-        Set<ObjectName> names = mbeanServer.queryNames(on, null);
-        assertEquals(1, names.size());
+                SedaEndpoint seda = context.getEndpoint("seda:start", SedaEndpoint.class);
+                seda.getQueue().put(new DefaultExchange(context));
 
-        log.info("Adding 2nd route");
-
-        // add a 2nd route
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("direct:bar").routeId("bar")
-                    .onException(Exception.class)
-                        .handled(true)
-                        .recipientList(header("error"))
-                    .end().end()
-                    .recipientList(header("bar")).throwException(new IllegalArgumentException("Forced"));
+                from("seda:start").routeId("foo")
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                RouteBuilder child = new RouteBuilder() {
+                                    @Override
+                                    public void configure() throws Exception {
+                                        from("seda:bar").routeId("bar").to("mock:bar");
+                                    }
+                                };
+                                context.addRoutes(child);
+                            }
+                        })
+                        .to("mock:result");
             }
-        });
-
-        // and send a message to it
-        getMockEndpoint("mock:bar").expectedMessageCount(1);
-        getMockEndpoint("mock:error").expectedMessageCount(1);
-
-        Map<String, Object> headers = new HashMap<String, Object>();
-        headers.put("error", "mock:error");
-        headers.put("bar", "mock:bar");
-        template.sendBodyAndHeaders("direct:bar", "Hello World", headers);
-
-        assertMockEndpointsSatisfied();
-
-        // there should be two more producer cache
-        names = mbeanServer.queryNames(on, null);
-        assertEquals(3, names.size());
-
-        // now stop and remove the 2nd route
-        log.info("Stopping 2nd route");
-        context.stopRoute("bar");
-
-        log.info("Removing 2nd route");
-        boolean removed = context.removeRoute("bar");
-        assertTrue(removed);
-
-        // the producer cache should have been removed
-        on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
-        names = mbeanServer.queryNames(on, null);
-        assertEquals(1, names.size());
-
-        log.info("Shutting down...");
+        };
     }
 
-    public void testRouteAddRemoteRouteWithRecipientListAndContextScopedOnException() throws
Exception {
-        MockEndpoint result = getMockEndpoint("mock:result");
-        result.expectedMessageCount(1);
-        template.sendBody("direct:start", "Hello World");
-        result.assertIsSatisfied();
-
+    public void testAddRouteFromRoute() throws Exception {
         MBeanServer mbeanServer = getMBeanServer();
-        ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
-
-        // number of producer caches
-        Set<ObjectName> names = mbeanServer.queryNames(on, null);
-        assertEquals(1, names.size());
-
-        log.info("Adding 2nd route");
-
-        // add a 2nd route
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                onException(Exception.class)
-                    .handled(true)
-                    .recipientList(header("error"))
-                    .end();
-
-                from("direct:bar").routeId("bar")
-                    .recipientList(header("bar")).throwException(new IllegalArgumentException("Forced"));
-            }
-        });
-
-        // and send a message to it
-        getMockEndpoint("mock:bar").expectedMessageCount(1);
-        getMockEndpoint("mock:error").expectedMessageCount(1);
-
-        Map<String, Object> headers = new HashMap<String, Object>();
-        headers.put("error", "mock:error");
-        headers.put("bar", "mock:bar");
-        template.sendBodyAndHeaders("direct:bar", "Hello World", headers);
-
-        assertMockEndpointsSatisfied();
+        ObjectName route1 = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"foo\"");
 
-        // there should be two more producer cache
-        names = mbeanServer.queryNames(on, null);
-        assertEquals(3, names.size());
+        // should be started
+        String state = (String) mbeanServer.getAttribute(route1, "State");
+        assertEquals("Should be started", ServiceStatus.Started.name(), state);
 
-        // now stop and remove the 2nd route
-        log.info("Stopping 2nd route");
-        context.stopRoute("bar");
-
-        log.info("Removing 2nd route");
-        boolean removed = context.removeRoute("bar");
-        assertTrue(removed);
-
-        // only the producer cache from the 2nd route should have been removed (the on exception
becomes context scoped)
-        on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
-        names = mbeanServer.queryNames(on, null);
-        assertEquals(2, names.size());
-
-        log.info("Shutting down...");
-    }
-
-    public void testRouteAddRemoteRouteWithRecipientListAndRouteScopedOnCompletion() throws
Exception {
         MockEndpoint result = getMockEndpoint("mock:result");
         result.expectedMessageCount(1);
-        template.sendBody("direct:start", "Hello World");
+        template.sendBody("seda:start", "Hello World");
         result.assertIsSatisfied();
 
-        MBeanServer mbeanServer = getMBeanServer();
-        ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
-
-        // number of producer caches
-        Set<ObjectName> names = mbeanServer.queryNames(on, null);
-        assertEquals(1, names.size());
-
-        log.info("Adding 2nd route");
+        // find the 2nd route
+        ObjectName route2 = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"bar\"");
 
-        // add a 2nd route
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("direct:bar").routeId("bar")
-                    .onCompletion()
-                        .recipientList(header("done"))
-                    .end().end()
-                    .recipientList(header("bar"));
-            }
-        });
-
-        // and send a message to it
-        getMockEndpoint("mock:bar").expectedMessageCount(1);
-        getMockEndpoint("mock:done").expectedMessageCount(1);
-
-        Map<String, Object> headers = new HashMap<String, Object>();
-        headers.put("done", "mock:done");
-        headers.put("bar", "mock:bar");
-        template.sendBodyAndHeaders("direct:bar", "Hello World", headers);
-
-        assertMockEndpointsSatisfied();
-
-        // there should be two more producer cache
-        names = mbeanServer.queryNames(on, null);
-        assertEquals(3, names.size());
-
-        // now stop and remove the 2nd route
-        log.info("Stopping 2nd route");
-        context.stopRoute("bar");
-
-        log.info("Removing 2nd route");
-        boolean removed = context.removeRoute("bar");
-        assertTrue(removed);
-
-        // the producer cache should have been removed
-        on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
-        names = mbeanServer.queryNames(on, null);
-        assertEquals(1, names.size());
-
-        log.info("Shutting down...");
+        // should be started
+        state = (String) mbeanServer.getAttribute(route2, "State");
+        assertEquals("Should be started", ServiceStatus.Started.name(), state);
     }
 
-    public void testRouteAddRemoteRouteWithRecipientListAndContextScopedOnCompletion() throws
Exception {
-        MockEndpoint result = getMockEndpoint("mock:result");
-        result.expectedMessageCount(1);
-        template.sendBody("direct:start", "Hello World");
-        result.assertIsSatisfied();
-
-        MBeanServer mbeanServer = getMBeanServer();
-        ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
-
-        // number of producer caches
-        Set<ObjectName> names = mbeanServer.queryNames(on, null);
-        assertEquals(1, names.size());
-
-        log.info("Adding 2nd route");
-
-        // add a 2nd route
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                onCompletion()
-                    .recipientList(header("done"))
-                    .end();
-
-                from("direct:bar").routeId("bar")
-                    .recipientList(header("bar"));
-            }
-        });
-
-        // and send a message to it
-        getMockEndpoint("mock:bar").expectedMessageCount(1);
-        getMockEndpoint("mock:done").expectedMessageCount(1);
-
-        Map<String, Object> headers = new HashMap<String, Object>();
-        headers.put("done", "mock:done");
-        headers.put("bar", "mock:bar");
-        template.sendBodyAndHeaders("direct:bar", "Hello World", headers);
-
-        assertMockEndpointsSatisfied();
-
-        // there should be two more producer cache
-        names = mbeanServer.queryNames(on, null);
-        assertEquals(3, names.size());
-
-        // now stop and remove the 2nd route
-        log.info("Stopping 2nd route");
-        context.stopRoute("bar");
-
-        log.info("Removing 2nd route");
-        boolean removed = context.removeRoute("bar");
-        assertTrue(removed);
-
-        // only the producer cache from the 2nd route should have been removed (the on completion
is context scoped)
-        on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=services,name=ProducerCache*");
-        names = mbeanServer.queryNames(on, null);
-        assertEquals(2, names.size());
-
-        log.info("Shutting down...");
-    }
 }



Mime
View raw message