camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lburgazz...@apache.org
Subject [01/10] camel git commit: CAMEL-11588: SupervisingRouteController - Routes may be started in wrong order
Date Tue, 08 Aug 2017 11:30:35 GMT
Repository: camel
Updated Branches:
  refs/heads/master d901b59e8 -> 43ca13886


CAMEL-11588: SupervisingRouteController - Routes may be started in wrong order


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0be3d902
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0be3d902
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0be3d902

Branch: refs/heads/master
Commit: 0be3d9025dbbc3095e2553190e6740816bed1fdb
Parents: d901b59
Author: lburgazzoli <lburgazzoli@gmail.com>
Authored: Fri Aug 4 16:43:15 2017 +0200
Committer: lburgazzoli <lburgazzoli@gmail.com>
Committed: Tue Aug 8 13:29:24 2017 +0200

----------------------------------------------------------------------
 .../camel/impl/SupervisingRouteController.java  | 299 ++++++++++++-------
 1 file changed, 187 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0be3d902/camel-core/src/main/java/org/apache/camel/impl/SupervisingRouteController.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/SupervisingRouteController.java
b/camel-core/src/main/java/org/apache/camel/impl/SupervisingRouteController.java
index 1f92653..aaa6796 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/SupervisingRouteController.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/SupervisingRouteController.java
@@ -17,7 +17,6 @@
 package org.apache.camel.impl;
 
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.EventObject;
 import java.util.HashMap;
 import java.util.List;
@@ -31,20 +30,25 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Experimental;
 import org.apache.camel.Route;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.ServiceStatus;
 import org.apache.camel.StartupListener;
 import org.apache.camel.management.event.CamelContextStartedEvent;
 import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.spi.HasId;
+import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.RouteController;
 import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.spi.RoutePolicyFactory;
 import org.apache.camel.support.EventNotifierSupport;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.backoff.BackOff;
 import org.apache.camel.util.backoff.BackOffContext;
 import org.apache.camel.util.backoff.BackOffTimer;
@@ -63,8 +67,8 @@ public class SupervisingRouteController extends DefaultRouteController {
     private static final Logger LOGGER = LoggerFactory.getLogger(SupervisingRouteController.class);
     private final Object lock;
     private final AtomicBoolean contextStarted;
-    private final Set<Route> startedRoutes;
-    private final Set<Route> stoppedRoutes;
+    private final AtomicInteger routeCount;
+    private final Set<RouteHolder> routes;
     private final CamelContextStartupListener listener;
     private final RouteManager routeManager;
     private BackOffTimer timer;
@@ -73,14 +77,10 @@ public class SupervisingRouteController extends DefaultRouteController
{
     private Map<String, BackOff> backOffConfigurations;
 
     public SupervisingRouteController() {
-        final Comparator<Route> comparator = Comparator.comparing(
-            route -> Optional.ofNullable(route.getRouteContext().getRoute().getStartupOrder()).orElse(Integer.MIN_VALUE)
-        );
-
         this.lock = new Object();
         this.contextStarted = new AtomicBoolean(false);
-        this.stoppedRoutes = new TreeSet<>(comparator);
-        this.startedRoutes = new TreeSet<>(comparator.reversed());
+        this.routeCount = new AtomicInteger(0);
+        this.routes = new TreeSet<>();
         this.routeManager = new RouteManager();
         this.defaultBackOff = BackOff.builder().build();
         this.backOffConfigurations = new HashMap<>();
@@ -160,96 +160,103 @@ public class SupervisingRouteController extends DefaultRouteController
{
 
     @Override
     public void startRoute(String routeId) throws Exception {
-        final CamelContext context = getCamelContext();
-        final Route route = context.getRoute(routeId);
+        final Optional<RouteHolder> route = routes.stream().filter(r -> r.getId().equals(routeId)).findFirst();
 
-        if (route == null) {
-            return;
+        if (!route.isPresent()) {
+            // This route is unknown to this controller, apply default behaviour
+            // from super class.
+            super.startRoute(routeId);
+        } else {
+            doStartRoute(route.get(), true, r -> super.startRoute(routeId));
         }
-
-        doStartRoute(context, route, true, r -> super.startRoute(routeId));
     }
 
     @Override
     public void stopRoute(String routeId) throws Exception {
-        final CamelContext context = getCamelContext();
-        final Route route = context.getRoute(routeId);
+        final Optional<RouteHolder> route = routes.stream().filter(r -> r.getId().equals(routeId)).findFirst();
 
-        if (route == null) {
-            return;
+        if (!route.isPresent()) {
+            // This route is unknown to this controller, apply default behaviour
+            // from super class.
+            super.stopRoute(routeId);
+        } else {
+            doStopRoute(route.get(), true, r -> super.stopRoute(routeId));
         }
-
-        doStopRoute(context, route, true, r -> super.stopRoute(routeId));
     }
 
     @Override
     public void stopRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception
{
-        final CamelContext context = getCamelContext();
-        final Route route = context.getRoute(routeId);
+        final Optional<RouteHolder> route = routes.stream().filter(r -> r.getId().equals(routeId)).findFirst();
 
-        if (route == null) {
-            return;
+        if (!route.isPresent()) {
+            // This route is unknown to this controller, apply default behaviour
+            // from super class.
+            super.stopRoute(routeId, timeout, timeUnit);
+        } else {
+            doStopRoute(route.get(), true, r -> super.stopRoute(r.getId(), timeout, timeUnit));
         }
-
-        doStopRoute(context, route, true, r -> super.stopRoute(r.getId(), timeout, timeUnit));
     }
 
     @Override
     public boolean stopRoute(String routeId, long timeout, TimeUnit timeUnit, boolean abortAfterTimeout)
throws Exception {
-        final CamelContext context = getCamelContext();
-        final Route route = context.getRoute(routeId);
-        final AtomicBoolean result = new AtomicBoolean(false);
-
-        if (route == null) {
-            return false;
-        }
+        final Optional<RouteHolder> route = routes.stream().filter(r -> r.getId().equals(routeId)).findFirst();
 
-        doStopRoute(context, route, true, r -> result.set(super.stopRoute(r.getId(), timeout,
timeUnit, abortAfterTimeout)));
+        if (!route.isPresent()) {
+            // This route is unknown to this controller, apply default behaviour
+            // from super class.
+            return super.stopRoute(routeId, timeout, timeUnit, abortAfterTimeout);
+        } else {
+            final AtomicBoolean result = new AtomicBoolean(false);
 
-        return result.get();
+            doStopRoute(route.get(), true, r -> result.set(super.stopRoute(r.getId(),
timeout, timeUnit, abortAfterTimeout)));
+            return result.get();
+        }
     }
 
     @Override
     public void suspendRoute(String routeId) throws Exception {
-        final CamelContext context = getCamelContext();
-        final Route route = context.getRoute(routeId);
+        final Optional<RouteHolder> route = routes.stream().filter(r -> r.getId().equals(routeId)).findFirst();
 
-        if (route == null) {
-            return;
+        if (!route.isPresent()) {
+            // This route is unknown to this controller, apply default behaviour
+            // from super class.
+            super.suspendRoute(routeId);
+        } else {
+            doStopRoute(route.get(), true, r -> super.suspendRoute(r.getId()));
         }
-
-        doStopRoute(context, route, true, r -> super.suspendRoute(r.getId()));
     }
 
     @Override
     public void suspendRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception
{
-        final CamelContext context = getCamelContext();
-        final Route route = context.getRoute(routeId);
+        final Optional<RouteHolder> route = routes.stream().filter(r -> r.getId().equals(routeId)).findFirst();
 
-        if (route == null) {
-            return;
+        if (!route.isPresent()) {
+            // This route is unknown to this controller, apply default behaviour
+            // from super class.
+            super.suspendRoute(routeId, timeout, timeUnit);
+        } else {
+            doStopRoute(route.get(), true, r -> super.suspendRoute(r.getId(), timeout,
timeUnit));
         }
-
-        doStopRoute(context, route, true, r -> super.suspendRoute(r.getId(), timeout,
timeUnit));
     }
 
     @Override
     public void resumeRoute(String routeId) throws Exception {
-        final CamelContext context = getCamelContext();
-        final Route route = context.getRoute(routeId);
+        final Optional<RouteHolder> route = routes.stream().filter(r -> r.getId().equals(routeId)).findFirst();
 
-        if (route == null) {
-            return;
+        if (!route.isPresent()) {
+            // This route is unknown to this controller, apply default behaviour
+            // from super class.
+            super.resumeRoute(routeId);
+        } else {
+            doStartRoute(route.get(), true, r -> super.startRoute(routeId));
         }
-
-        doStartRoute(context, route, true, r -> super.startRoute(routeId));
     }
 
     // *********************************
     // Helpers
     // *********************************
 
-    private void doStopRoute(CamelContext context, Route route,  boolean checker, ThrowingConsumer<Route,
Exception> consumer) throws Exception {
+    private void doStopRoute(RouteHolder route,  boolean checker, ThrowingConsumer<RouteHolder,
Exception> consumer) throws Exception {
         synchronized (lock) {
             if (checker) {
                 // remove them from checked routes so they don't get started by the
@@ -258,7 +265,7 @@ public class SupervisingRouteController extends DefaultRouteController
{
                 routeManager.release(route);
             }
 
-            ServiceStatus status = context.getRouteStatus(route.getId());
+            ServiceStatus status = route.getStatus();
             if (!status.isStoppable()) {
                 LOGGER.debug("Route {} status is {}, skipping", route.getId(), status);
                 return;
@@ -266,38 +273,28 @@ public class SupervisingRouteController extends DefaultRouteController
{
 
             consumer.accept(route);
 
-            startedRoutes.remove(route);
-            stoppedRoutes.add(route);
-
             // Mark the route as un-managed
-            route.getRouteContext().setRouteController(null);
+            route.getContext().setRouteController(null);
         }
     }
 
-    private void doStartRoute(CamelContext context, Route route, boolean checker, ThrowingConsumer<Route,
Exception> consumer) throws Exception {
+    private void doStartRoute(RouteHolder route, boolean checker, ThrowingConsumer<RouteHolder,
Exception> consumer) throws Exception {
         synchronized (lock) {
-            ServiceStatus status = context.getRouteStatus(route.getId());
+            ServiceStatus status = route.getStatus();
             if (!status.isStartable()) {
                 LOGGER.debug("Route {} status is {}, skipping", route.getId(), status);
                 return;
             }
 
             try {
-                // remove the route from any queue
-                stoppedRoutes.remove(route);
-                startedRoutes.remove(route);
-
                 if (checker) {
                     routeManager.release(route);
                 }
 
                 // Mark the route as managed
-                route.getRouteContext().setRouteController(this);
+                route.getContext().setRouteController(this);
 
                 consumer.accept(route);
-
-                // route started successfully
-                startedRoutes.add(route);
             } catch (Exception e) {
 
                 if (checker) {
@@ -316,13 +313,16 @@ public class SupervisingRouteController extends DefaultRouteController
{
             return;
         }
 
-        List<String> routes;
+        final List<String> routeList;
 
         synchronized (lock) {
-            routes = stoppedRoutes.stream().map(Route::getId).collect(Collectors.toList());
+            routeList = routes.stream()
+                .filter(r -> r.getStatus() == ServiceStatus.Stopped)
+                .map(RouteHolder::getId)
+                .collect(Collectors.toList());
         }
 
-        for (String route: routes) {
+        for (String route: routeList) {
             try {
                 startRoute(route);
             } catch (Exception e) {
@@ -332,13 +332,20 @@ public class SupervisingRouteController extends DefaultRouteController
{
     }
 
     private synchronized void stopRoutes() {
-        List<String> routes;
+        if (!isRunAllowed()) {
+            return;
+        }
+
+        final List<String> routeList;
 
         synchronized (lock) {
-            routes = startedRoutes.stream().map(Route::getId).collect(Collectors.toList());
+            routeList = routes.stream()
+                .filter(r -> r.getStatus() == ServiceStatus.Started)
+                .map(RouteHolder::getId)
+                .collect(Collectors.toList());
         }
 
-        for (String route: routes) {
+        for (String route: routeList) {
             try {
                 stopRoute(route);
             } catch (Exception e) {
@@ -353,17 +360,15 @@ public class SupervisingRouteController extends DefaultRouteController
{
 
     private class RouteManager {
         private final Logger logger;
-        private final ConcurrentMap<Route, CompletableFuture<BackOffContext>>
routes;
+        private final ConcurrentMap<RouteHolder, CompletableFuture<BackOffContext>>
routes;
 
         RouteManager() {
             this.logger = LoggerFactory.getLogger(RouteManager.class);
             this.routes = new ConcurrentHashMap<>();
         }
 
-        void start(Route route) {
-            route.getRouteContext().setRouteController(SupervisingRouteController.this);
-            
-            final CamelContext camelContext = getCamelContext();
+        void start(RouteHolder route) {
+            route.getContext().setRouteController(SupervisingRouteController.this);
 
             routes.computeIfAbsent(
                 route,
@@ -378,7 +383,7 @@ public class SupervisingRouteController extends DefaultRouteController
{
                         try {
                             logger.info("Try to restart route: {}", r.getId());
 
-                            doStartRoute(camelContext, r, false, rx -> SupervisingRouteController.super.startRoute(rx.getId()));
+                            doStartRoute(r, false, rx -> SupervisingRouteController.super.startRoute(rx.getId()));
                             return false;
                         } catch (Exception e) {
                             return true;
@@ -396,17 +401,12 @@ public class SupervisingRouteController extends DefaultRouteController
{
                             }
 
                             synchronized (lock) {
-                                ServiceStatus status = camelContext.getRouteStatus(route.getId());
+                                final ServiceStatus status = route.getStatus();
 
                                 if (status.isStopped() || status.isStopping()) {
                                     LOGGER.info("Route {} has status {}, stop supervising
it", route.getId(), status);
 
-                                    r.getRouteContext().setRouteController(null);
-                                    stoppedRoutes.add(r);
-                                } else if (status.isStarted() || status.isStarting()) {
-                                    synchronized (lock) {
-                                        startedRoutes.add(r);
-                                    }
+                                    r.getContext().setRouteController(null);
                                 }
                             }
                         }
@@ -419,7 +419,7 @@ public class SupervisingRouteController extends DefaultRouteController
{
             );
         }
 
-        boolean release(Route route) {
+        boolean release(RouteHolder route) {
             CompletableFuture<BackOffContext> future = routes.remove(route);
             if (future != null) {
                 future.cancel(true);
@@ -433,18 +433,87 @@ public class SupervisingRouteController extends DefaultRouteController
{
             routes.clear();
         }
 
-        boolean isSupervising(Route route) {
+        boolean isSupervising(RouteHolder route) {
             return routes.containsKey(route);
         }
 
-        Collection<Route> routes() {
+        Collection<RouteHolder> routes() {
             return routes.keySet();
         }
     }
 
-    private boolean isSupervising(Route route) {
-        synchronized (lock) {
-            return stoppedRoutes.contains(route) || startedRoutes.contains(route) || routeManager.isSupervising(route);
+    // *********************************
+    //
+    // *********************************
+
+    private class RouteHolder implements HasId, Comparable<RouteHolder> {
+        private final int order;
+        private final Route route;
+
+        RouteHolder(Route route, int order) {
+            this.route = route;
+            this.order = order;
+        }
+
+        @Override
+        public String getId() {
+            return this.route.getId();
+        }
+
+        public Route get() {
+            return this.route;
+        }
+
+        public RouteContext getContext() {
+            return this.route.getRouteContext();
+        }
+
+        public RouteDefinition getDefinition() {
+            return this.route.getRouteContext().getRoute();
+        }
+
+        public ServiceStatus getStatus() {
+            return getContext().getCamelContext().getRouteStatus(getId());
+        }
+
+        public int getInitializationOrder() {
+            return order;
+        }
+
+        public int getStartupOrder() {
+            Integer order = getDefinition().getStartupOrder();
+            if (order == null) {
+                order = Integer.MAX_VALUE;
+            }
+
+            return order;
+        }
+
+        @Override
+        public int compareTo(RouteHolder o) {
+            int answer = Integer.compare(getStartupOrder(), o.getStartupOrder());
+            if (answer == 0) {
+                answer = Integer.compare(getInitializationOrder(), o.getInitializationOrder());
+            }
+
+            return answer;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            return this.route.equals(((RouteHolder)o).route);
+        }
+
+        @Override
+        public int hashCode() {
+            return route.hashCode();
         }
     }
 
@@ -464,29 +533,35 @@ public class SupervisingRouteController extends DefaultRouteController
{
     private class ManagedRoutePolicy implements RoutePolicy {
         @Override
         public void onInit(Route route) {
-            route.getRouteContext().setRouteController(SupervisingRouteController.this);
-            route.getRouteContext().getRoute().setAutoStartup("false");
-
-            if (contextStarted.get()) {
-                LOGGER.debug("Context is started: add route {} to startable routes", route.getId());
-                try {
-                    SupervisingRouteController.this.doStartRoute(
-                        getCamelContext(),
-                        route,
-                        true,
-                        r -> SupervisingRouteController.super.startRoute(r.getId())
-                    );
-                } catch (Exception e) {
-                    e.printStackTrace();
+            RouteHolder holder = new RouteHolder(route, routeCount.incrementAndGet());
+            if (routes.add(holder)) {
+                holder.getContext().setRouteController(SupervisingRouteController.this);
+                holder.getDefinition().setAutoStartup("false");
+
+                if (contextStarted.get()) {
+                    LOGGER.debug("Context is started: attempt to start route {}", route.getId());
+                    try {
+                        SupervisingRouteController.this.doStartRoute(
+                            holder,
+                            true,
+                            r -> SupervisingRouteController.super.startRoute(r.getId())
+                        );
+                    } catch (Exception e) {
+                        throw new RuntimeCamelException(e);
+                    }
+                } else {
+                    LOGGER.debug("Context is not started: add route {} to stopped routes",
holder.getId());
                 }
-            } else {
-                LOGGER.debug("Context is not started: add route {} to stopped routes", route.getId());
-                stoppedRoutes.add(route);
             }
         }
 
         @Override
         public void onRemove(Route route) {
+            synchronized (lock) {
+                routes.removeIf(
+                    r -> ObjectHelper.equal(r.get(), route) || ObjectHelper.equal(r.getId(),
route.getId())
+                );
+            }
         }
 
         @Override


Mime
View raw message