camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r831818 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/impl/ main/java/org/apache/camel/management/mbean/ main/java/org/apache/camel/util/ test/java/org/apache/camel/management/
Date Mon, 02 Nov 2009 08:35:11 GMT
Author: davsclaus
Date: Mon Nov  2 08:35:11 2009
New Revision: 831818

URL: http://svn.apache.org/viewvc?rev=831818&view=rev
Log:
CAMEL-1048: Made SuspendableServie JMX managementable.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSuspendedServiceTest.java
  (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RoutePolicySupport.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedService.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RoutePolicySupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RoutePolicySupport.java?rev=831818&r1=831817&r2=831818&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RoutePolicySupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/RoutePolicySupport.java Mon
Nov  2 08:35:11 2009
@@ -19,7 +19,6 @@
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Route;
-import org.apache.camel.SuspendableService;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.util.ServiceHelper;
@@ -43,67 +42,19 @@
     }
 
     protected boolean startConsumer(Consumer consumer) throws Exception {
-        if (consumer instanceof SuspendableService) {
-            SuspendableService ss = (SuspendableService) consumer;
-            if (ss.isSuspended()) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Resuming consumer " + consumer);
-                }
-                ss.resume();
-                return true;
-            } else {
-                return false;
-            }
-        } else if (consumer instanceof ServiceSupport) {
-            ServiceSupport ss = (ServiceSupport) consumer;
-            if (ss.getStatus().isStartable()) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Stopping consumer " + consumer);
-                }
-                consumer.start();
-                return true;
-            } else {
-                return false;
-            }
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug("Stopping consumer " + consumer);
-            }
-            ServiceHelper.startService(consumer);
-            return true;
+        boolean resumed = ServiceHelper.resumeService(consumer);
+        if (resumed && log.isDebugEnabled()) {
+            log.debug("Resuming consumer " + consumer);
         }
+        return resumed;
     }
 
     protected boolean stopConsumer(Consumer consumer) throws Exception {
-        if (consumer instanceof SuspendableService) {
-            SuspendableService ss = (SuspendableService) consumer;
-            if (!ss.isSuspended()) {
-                ss.suspend();
-                if (log.isDebugEnabled()) {
-                    log.debug("Suspending consumer " + consumer);
-                }
-                return true;
-            } else {
-                return false;
-            }
-        } else if (consumer instanceof ServiceSupport) {
-            ServiceSupport ss = (ServiceSupport) consumer;
-            if (ss.getStatus().isStoppable()) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Stopping consumer " + consumer);
-                }
-                consumer.stop();
-                return true;
-            } else {
-                return false;
-            }
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug("Stopping consumer " + consumer);
-            }
-            ServiceHelper.stopService(consumer);
-            return true;
+        boolean suspended = ServiceHelper.suspendService(consumer);
+        if (suspended && log.isDebugEnabled()) {
+            log.debug("Suspended consumer " + consumer);
         }
+        return suspended;
     }
 
     /**

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java?rev=831818&r1=831817&r2=831818&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java Mon Nov
 2 08:35:11 2009
@@ -32,6 +32,9 @@
  * @version $Revision$
  */
 public abstract class ServiceSupport implements Service {
+
+    // TODO: refactor and move me to org.apache.camel.util package
+
     private final AtomicBoolean started = new AtomicBoolean(false);
     private final AtomicBoolean starting = new AtomicBoolean(false);
     private final AtomicBoolean stopping = new AtomicBoolean(false);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedService.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedService.java?rev=831818&r1=831817&r2=831818&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedService.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedService.java
Mon Nov  2 08:35:11 2009
@@ -20,6 +20,7 @@
 import org.apache.camel.Route;
 import org.apache.camel.Service;
 import org.apache.camel.ServiceStatus;
+import org.apache.camel.SuspendableService;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.spi.ManagementStrategy;
 import org.springframework.jmx.export.annotation.ManagedAttribute;
@@ -96,4 +97,40 @@
     public void stop() throws Exception {
         service.stop();
     }
+
+    @ManagedAttribute(description = "Whether this service supports suspension")
+    public boolean isSupportSuspension() {
+        return service instanceof SuspendableService;
+    }
+
+    @ManagedAttribute(description = "Whether this service is suspended")
+    public boolean isSuspended() {
+        if (service instanceof SuspendableService) {
+            SuspendableService ss = (SuspendableService) service;
+            return ss.isSuspended();
+        } else {
+            return false;
+        }
+    }
+
+    @ManagedOperation(description = "Suspend Service")
+    public void suspend() throws Exception {
+        if (service instanceof SuspendableService) {
+            SuspendableService ss = (SuspendableService) service;
+            ss.suspend();
+        } else {
+            throw new UnsupportedOperationException("suspend() is not a supported operation");
+        }
+    }
+
+    @ManagedOperation(description = "Resume Service")
+    public void resume() throws Exception {
+        if (service instanceof SuspendableService) {
+            SuspendableService ss = (SuspendableService) service;
+            ss.resume();
+        } else {
+            throw new UnsupportedOperationException("resume() is not a supported operation");
+        }
+    }
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java?rev=831818&r1=831817&r2=831818&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java Mon Nov
 2 08:35:11 2009
@@ -21,6 +21,8 @@
 import java.util.List;
 
 import org.apache.camel.Service;
+import org.apache.camel.SuspendableService;
+import org.apache.camel.impl.ServiceSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -85,6 +87,9 @@
     public static void stopService(Object value) throws Exception {
         if (value instanceof Service) {
             Service service = (Service)value;
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Stopping service " + value);
+            }
             service.stop();
         } else if (value instanceof Collection) {
             stopServices((Collection)value);
@@ -116,4 +121,89 @@
             throw firstException;
         }
     }
+
+    /**
+     * Resumes the given service.
+     * <p/>
+     * If the service is a {@link org.apache.camel.SuspendableService} then the <tt>resume</tt>
+     * operation is <b>only</b> invoked if the service is suspended.
+     * <p/>
+     * If the service is a {@link org.apache.camel.impl.ServiceSupport} then the <tt>start</tt>
+     * operation is <b>only</b> invoked if the service is startable.
+     * <p/>
+     * Otherwise the service is started.
+     *
+     * @param service the service
+     * @return <tt>true</tt> if either <tt>resume</tt> or <tt>start</tt>
was invoked,
+     * <tt>false</tt> if the service is already in the desired state.
+     * @throws Exception is thrown if error occurred
+     */
+    public static boolean resumeService(Service service) throws Exception {
+        if (service instanceof SuspendableService) {
+            SuspendableService ss = (SuspendableService) service;
+            if (ss.isSuspended()) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Resuming service " + service);
+                }
+                ss.resume();
+                return true;
+            } else {
+                return false;
+            }
+        } else if (service instanceof ServiceSupport) {
+            ServiceSupport ss = (ServiceSupport) service;
+            if (ss.getStatus().isStartable()) {
+                startService(service);
+                return true;
+            } else {
+                return false;
+            }
+        } else {
+            startService(service);
+            return true;
+        }
+    }
+
+    /**
+     * Suspends the given service.
+     * <p/>
+     * If the service is a {@link org.apache.camel.SuspendableService} then the <tt>suspend</tt>
+     * operation is <b>only</b> invoked if the service is <b>not</b>
suspended.
+     * <p/>
+     * If the service is a {@link org.apache.camel.impl.ServiceSupport} then the <tt>stop</tt>
+     * operation is <b>only</b> invoked if the service is stoptable.
+     * <p/>
+     * Otherwise the service is stopped.
+     *
+     * @param service the service
+     * @return <tt>true</tt> if either <tt>suspend</tt> or <tt>stop</tt>
was invoked,
+     * <tt>false</tt> if the service is already in the desired state.
+     * @throws Exception is thrown if error occurred
+     */
+    public static boolean suspendService(Service service) throws Exception {
+        if (service instanceof SuspendableService) {
+            SuspendableService ss = (SuspendableService) service;
+            if (!ss.isSuspended()) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Suspending service " + service);
+                }
+                ss.suspend();
+                return true;
+            } else {
+                return false;
+            }
+        } else if (service instanceof ServiceSupport) {
+            ServiceSupport ss = (ServiceSupport) service;
+            if (ss.getStatus().isStoppable()) {
+                stopServices(service);
+                return true;
+            } else {
+                return false;
+            }
+        } else {
+            stopService(service);
+            return true;
+        }
+    }
+
 }

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSuspendedServiceTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSuspendedServiceTest.java?rev=831818&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSuspendedServiceTest.java
(added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSuspendedServiceTest.java
Mon Nov  2 08:35:11 2009
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.io.File;
+import java.util.Set;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Route;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.RoutePolicySupport;
+
+/**
+ * @version $Revision$
+ */
+public class ManagedSuspendedServiceTest extends ContextTestSupport {
+
+    public void testConsumeSuspendAndResumeFile() throws Exception {
+        deleteDirectory("target/suspended");
+
+        MBeanServer mbeanServer = context.getManagementStrategy().getManagementAgent().getMBeanServer();
+
+        Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=consumers,*"),
null);
+        assertEquals(1, set.size());
+
+        ObjectName on = set.iterator().next();
+
+        boolean registered = mbeanServer.isRegistered(on);
+        assertEquals("Should be registered", true, registered);
+        Boolean ss = (Boolean) mbeanServer.getAttribute(on, "SupportSuspension");
+        assertEquals(true, ss.booleanValue());
+        Boolean suspended = (Boolean) mbeanServer.getAttribute(on, "Suspended");
+        assertEquals(false, suspended.booleanValue());
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Bye World");
+
+        template.sendBodyAndHeader("file://target/suspended", "Bye World", Exchange.FILE_NAME,
"bye.txt");
+        template.sendBodyAndHeader("file://target/suspended", "Hello World", Exchange.FILE_NAME,
"hello.txt");
+
+        assertMockEndpointsSatisfied();
+
+        Thread.sleep(1000);
+
+        // now its suspended by the policy
+        suspended = (Boolean) mbeanServer.getAttribute(on, "Suspended");
+        assertEquals(true, suspended.booleanValue());
+
+        // the route is suspended by the policy so we should only receive one
+        File file = new File("target/suspended/hello.txt").getAbsoluteFile();
+        assertEquals("The file should exists", true, file.exists());
+
+        // reset mock
+        mock.reset();
+        mock.expectedBodiesReceived("Hello World");
+
+        // now resume it
+        mbeanServer.invoke(on, "resume", null, null);
+
+        assertMockEndpointsSatisfied();
+
+        suspended = (Boolean) mbeanServer.getAttribute(on, "Suspended");
+        assertEquals(false, suspended.booleanValue());
+
+        Thread.sleep(500);
+
+        // and the file is now moved
+        assertEquals("The file should not exists", false, file.exists());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                MyPolicy myPolicy = new MyPolicy();
+
+                from("file://target/suspended?maxMessagesPerPoll=1&sortBy=file:name")
+                    .routePolicy(myPolicy).id("myRoute")
+                    .to("mock:result");
+            }
+        };
+    }
+
+    private class MyPolicy extends RoutePolicySupport {
+
+        public void onExchangeDone(Route route, Exchange exchange) {
+            try {
+                super.stopConsumer(route.getConsumer());
+            } catch (Exception e) {
+                handleException(e);
+            }
+        }
+
+    }
+
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSuspendedServiceTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedSuspendedServiceTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message