camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lburgazz...@apache.org
Subject [04/13] camel git commit: CAMEL-11362: create a LeaderElectionservice
Date Fri, 16 Jun 2017 15:39:05 GMT
CAMEL-11362: create a LeaderElectionservice


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b6f1bdd8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b6f1bdd8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b6f1bdd8

Branch: refs/heads/master
Commit: b6f1bdd8807efaa3e085d410affd4497e24aeab3
Parents: 9ddff22
Author: lburgazzoli <lburgazzoli@gmail.com>
Authored: Wed May 31 18:12:47 2017 +0200
Committer: lburgazzoli <lburgazzoli@gmail.com>
Committed: Fri Jun 16 17:37:53 2017 +0200

----------------------------------------------------------------------
 .../camel/ha/AbstractCamelClusterView.java      | 121 ---------
 .../java/org/apache/camel/ha/CamelCluster.java  |   4 +-
 .../apache/camel/ha/CamelClusterFactory.java    |  27 ++
 .../org/apache/camel/ha/CamelClusterHelper.java |  28 ++
 .../org/apache/camel/ha/CamelClusterView.java   |   5 +-
 .../org/apache/camel/ha/LeaderRoutePolicy.java  | 169 ------------
 .../camel/impl/ha/AbstractCamelCluster.java     | 115 +++++++++
 .../camel/impl/ha/AbstractCamelClusterView.java | 137 ++++++++++
 .../camel/impl/ha/ClusteredRoutePolicy.java     | 256 +++++++++++++++++++
 .../atomix/ha/AtomixClusterMember.java          |  45 ----
 10 files changed, 570 insertions(+), 337 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b6f1bdd8/camel-core/src/main/java/org/apache/camel/ha/AbstractCamelClusterView.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/ha/AbstractCamelClusterView.java b/camel-core/src/main/java/org/apache/camel/ha/AbstractCamelClusterView.java
deleted file mode 100644
index 64cee58..0000000
--- a/camel-core/src/main/java/org/apache/camel/ha/AbstractCamelClusterView.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.ha;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.locks.StampedLock;
-import java.util.function.BiConsumer;
-import java.util.function.Predicate;
-
-public abstract class AbstractCamelClusterView implements CamelClusterView {
-    private final CamelCluster cluster;
-    private final String namespace;
-    private final List<FilteringConsumer> consumers;
-    private final StampedLock lock;
-
-    protected AbstractCamelClusterView(CamelCluster cluster, String namespace) {
-        this.cluster = cluster;
-        this.namespace = namespace;
-        this.consumers = new ArrayList<>();
-        this.lock = new StampedLock();
-    }
-
-    @Override
-    public CamelCluster getCluster() {
-        return  this.cluster;
-    }
-
-    @Override
-    public String getNamespace() {
-        return this.namespace;
-    }
-
-    @Override
-    public void addEventListener(BiConsumer<Event, Object> consumer) {
-        long stamp = lock.writeLock();
-
-        try {
-            consumers.add(new FilteringConsumer(e -> true, consumer));
-        } finally {
-            lock.unlockWrite(stamp);
-        }
-    }
-
-    @Override
-    public void addEventListener(Predicate<Event> predicate, BiConsumer<Event, Object>
consumer) {
-        long stamp = lock.writeLock();
-
-        try {
-            this.consumers.add(new FilteringConsumer(predicate, consumer));
-        } finally {
-            lock.unlockWrite(stamp);
-        }
-    }
-
-    @Override
-    public void removeEventListener(BiConsumer<Event, Object> consumer) {
-        long stamp = lock.writeLock();
-
-        try {
-            consumers.removeIf(c -> c.getConsumer().equals(consumer));
-        } finally {
-            lock.unlockWrite(stamp);
-        }
-    }
-
-    // **************************************
-    // Events
-    // **************************************
-
-    protected void fireEvent(CamelClusterView.Event event, Object payload) {
-        long stamp = lock.readLock();
-
-        try {
-            for (int i = 0; i < consumers.size(); i++) {
-                consumers.get(0).accept(event, payload);
-            }
-        } finally {
-            lock.unlockRead(stamp);
-        }
-    }
-
-    // **************************************
-    // Helpers
-    // **************************************
-
-    private final class FilteringConsumer implements BiConsumer<Event, Object> {
-        private final Predicate<Event> predicate;
-        private final BiConsumer<Event, Object> consumer;
-
-        FilteringConsumer(Predicate<Event> predicate,  BiConsumer<Event, Object>
consumer) {
-            this.predicate = predicate;
-            this.consumer = consumer;
-        }
-
-        @Override
-        public void accept(CamelClusterView.Event event, Object payload) {
-            if (predicate.test(event)) {
-                consumer.accept(event, payload);
-            }
-        }
-
-        public BiConsumer<Event, Object> getConsumer() {
-            return this.consumer;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/b6f1bdd8/camel-core/src/main/java/org/apache/camel/ha/CamelCluster.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/ha/CamelCluster.java b/camel-core/src/main/java/org/apache/camel/ha/CamelCluster.java
index 5da1e8c..d6c151c 100644
--- a/camel-core/src/main/java/org/apache/camel/ha/CamelCluster.java
+++ b/camel-core/src/main/java/org/apache/camel/ha/CamelCluster.java
@@ -16,9 +16,11 @@
  */
 package org.apache.camel.ha;
 
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Service;
 import org.apache.camel.spi.HasId;
 
-public interface CamelCluster extends HasId {
+public interface CamelCluster extends Service, CamelContextAware, HasId {
     /**
      * Creates a view of the cluster bound to a namespace.
      *

http://git-wip-us.apache.org/repos/asf/camel/blob/b6f1bdd8/camel-core/src/main/java/org/apache/camel/ha/CamelClusterFactory.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterFactory.java b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterFactory.java
new file mode 100644
index 0000000..be332b4
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterFactory.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.ha;
+
+import org.apache.camel.CamelContext;
+
+@FunctionalInterface
+public interface CamelClusterFactory {
+    /**
+     * Creates an instance of a cluster.
+     */
+    CamelCluster newInstance(CamelContext camelContext) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b6f1bdd8/camel-core/src/main/java/org/apache/camel/ha/CamelClusterHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterHelper.java b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterHelper.java
new file mode 100644
index 0000000..c6f91ed
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterHelper.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.ha;
+
+import java.util.function.Predicate;
+
+public final class CamelClusterHelper {
+    private CamelClusterHelper() {
+    }
+
+    public static Predicate<CamelClusterView.Event> leadershipEventFilter() {
+        return e -> e == CamelClusterView.Event.LEADERSHIP_CHANGED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b6f1bdd8/camel-core/src/main/java/org/apache/camel/ha/CamelClusterView.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterView.java b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterView.java
index 7af5a21..bda7f8b 100644
--- a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterView.java
+++ b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterView.java
@@ -20,10 +20,13 @@ import java.util.List;
 import java.util.function.BiConsumer;
 import java.util.function.Predicate;
 
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Service;
+
 /**
  * Represents the View of the cluster at some given period of time.
  */
-public interface CamelClusterView {
+public interface CamelClusterView extends Service, CamelContextAware {
 
     enum Event {
         KEEP_ALIVE,

http://git-wip-us.apache.org/repos/asf/camel/blob/b6f1bdd8/camel-core/src/main/java/org/apache/camel/ha/LeaderRoutePolicy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/ha/LeaderRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/ha/LeaderRoutePolicy.java
deleted file mode 100644
index 40ae1e8..0000000
--- a/camel-core/src/main/java/org/apache/camel/ha/LeaderRoutePolicy.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.ha;
-
-import java.util.HashSet;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiConsumer;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.CamelContextAware;
-import org.apache.camel.Route;
-import org.apache.camel.api.management.ManagedAttribute;
-import org.apache.camel.api.management.ManagedResource;
-import org.apache.camel.support.RoutePolicySupport;
-import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.ReferenceCount;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@ManagedResource(description = "Route policy using ...")
-public class LeaderRoutePolicy extends RoutePolicySupport implements CamelContextAware {
-    private static final Logger LOGGER = LoggerFactory.getLogger(LeaderRoutePolicy.class);
-
-    private final AtomicBoolean leader;
-    private final Set<Route> startedRoutes;
-    private final Set<Route> stoppedRoutes;
-    private final ReferenceCount refCount;
-    private final CamelClusterView clusterView;
-    private final CamelClusterMember clusterMember;
-    private final BiConsumer<CamelClusterView.Event, Object> clusterEventConsumer;
-    private CamelContext camelContext;
-
-    public LeaderRoutePolicy(CamelClusterView clusterView, CamelClusterMember clusterMember)
{
-        this.clusterMember = clusterMember;
-        this.clusterView = clusterView;
-        this.clusterEventConsumer = this::onClusterEvent;
-        this.stoppedRoutes = new HashSet<>();
-        this.startedRoutes = new HashSet<>();
-        this.leader = new AtomicBoolean(false);
-
-        this.refCount = ReferenceCount.on(
-            () -> clusterView.addEventListener(clusterEventConsumer),
-            () -> clusterView.removeEventListener(clusterEventConsumer)
-        );
-    }
-
-    @Override
-    public CamelContext getCamelContext() {
-        return camelContext;
-    }
-
-    @Override
-    public void setCamelContext(CamelContext camelContext) {
-        this.camelContext = camelContext;
-    }
-
-    @Override
-    public synchronized void onInit(Route route) {
-        super.onInit(route);
-
-        LOGGER.info("Route managed by {}. Setting route {} AutoStartup flag to false.", getClass(),
route.getId());
-        route.getRouteContext().getRoute().setAutoStartup("false");
-
-        stoppedRoutes.add(route);
-
-        this.refCount.retain();
-
-        startManagedRoutes();
-    }
-
-    @Override
-    public synchronized void doShutdown() {
-        this.refCount.release();
-    }
-
-    // ****************************************************
-    // Management
-    // ****************************************************
-
-    @ManagedAttribute(description = "Is this route the master or a slave")
-    public boolean isLeader() {
-        return leader.get();
-    }
-
-    // ****************************************************
-    // Route managements
-    // ****************************************************
-
-    private void onClusterEvent(CamelClusterView.Event event, Object payload) {
-        if (event == CamelClusterView.Event.KEEP_ALIVE) {
-            LOGGER.debug("Got KEEP_ALIVE from cluster '{}' with payload '{}'", clusterView.getCluster().getId(),
Objects.toString(payload));
-        }
-        if (event == CamelClusterView.Event.LEADERSHIP_CHANGED) {
-            boolean isLeader = ObjectHelper.equal(clusterMember.getId(), clusterView.getMaster().getId());
-
-            if (isLeader && leader.compareAndSet(false, isLeader)) {
-                LOGGER.info("Leadership taken");
-                startManagedRoutes();
-            } else if (!isLeader && leader.getAndSet(isLeader)) {
-                LOGGER.info("Leadership lost");
-                stopManagedRoutes();
-            }
-        }
-    }
-
-    private synchronized void startManagedRoutes() {
-        if (isLeader()) {
-            doStartManagedRoutes();
-        } else {
-            // If the leadership has been lost in the meanwhile, stop any
-            // eventually started route
-            doStopManagedRoutes();
-        }
-    }
-
-    private synchronized void doStartManagedRoutes() {
-        try {
-            for (Route route : stoppedRoutes) {
-                LOGGER.debug("Starting route {}", route.getId());
-                startRoute(route);
-                startedRoutes.add(route);
-            }
-
-            stoppedRoutes.removeAll(startedRoutes);
-        } catch (Exception e) {
-            handleException(e);
-        }
-    }
-
-    private synchronized void stopManagedRoutes() {
-        if (isLeader()) {
-            // If became a leader in the meanwhile, start any eventually stopped
-            // route
-            doStartManagedRoutes();
-        } else {
-            doStopManagedRoutes();
-        }
-    }
-
-    private synchronized void doStopManagedRoutes() {
-        try {
-            for (Route route : startedRoutes) {
-                LOGGER.debug("Stopping route {}", route.getId());
-                stopRoute(route);
-                stoppedRoutes.add(route);
-            }
-
-            startedRoutes.removeAll(stoppedRoutes);
-        } catch (Exception e) {
-            handleException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/b6f1bdd8/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelCluster.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelCluster.java b/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelCluster.java
new file mode 100644
index 0000000..9c4691e
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelCluster.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.ha;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.StampedLock;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ha.CamelCluster;
+import org.apache.camel.ha.CamelClusterView;
+import org.apache.camel.support.ServiceSupport;
+
+public abstract class AbstractCamelCluster<T extends CamelClusterView> extends ServiceSupport
implements CamelCluster {
+    private final String id;
+    private final Map<String, T> views;
+    private final StampedLock lock;
+    private CamelContext camelContext;
+
+    protected AbstractCamelCluster(String id) {
+        this(id, null);
+    }
+
+    protected AbstractCamelCluster(String id, CamelContext camelContext) {
+        this.id = id;
+        this.camelContext = camelContext;
+        this.views = new HashMap<>();
+        this.lock = new StampedLock();
+    }
+
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        long stamp = lock.readLock();
+
+        try {
+            for (T view : views.values()) {
+                view.start();
+            }
+        } finally {
+            lock.unlockRead(stamp);
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        long stamp = lock.readLock();
+
+        try {
+            for (T view : views.values()) {
+                view.stop();
+            }
+        } finally {
+            lock.unlockRead(stamp);
+        }
+    }
+
+    @Override
+    public CamelClusterView createView(String namespace) throws Exception {
+        long stamp = lock.writeLock();
+
+        try {
+            T view = views.get(namespace);
+
+            if (view == null) {
+                view = doCreateView(namespace);
+                view.setCamelContext(this.camelContext);
+
+                views.put(namespace, view);
+
+                if (AbstractCamelCluster.this.isRunAllowed()) {
+                    view.start();
+                }
+            }
+
+            return view;
+        } finally {
+            lock.unlockWrite(stamp);
+        }
+    }
+
+    // **********************************
+    // Implementation
+    // **********************************
+
+    protected abstract T doCreateView(String namespace) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b6f1bdd8/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java
b/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java
new file mode 100644
index 0000000..1149a7f
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.ha;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.StampedLock;
+import java.util.function.BiConsumer;
+import java.util.function.Predicate;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ha.CamelCluster;
+import org.apache.camel.ha.CamelClusterView;
+import org.apache.camel.support.ServiceSupport;
+
+public abstract class AbstractCamelClusterView extends ServiceSupport implements CamelClusterView
{
+    private final CamelCluster cluster;
+    private final String namespace;
+    private final List<FilteringConsumer> consumers;
+    private final StampedLock lock;
+    private CamelContext camelContext;
+
+    protected AbstractCamelClusterView(CamelCluster cluster, String namespace) {
+        this.cluster = cluster;
+        this.namespace = namespace;
+        this.consumers = new ArrayList<>();
+        this.lock = new StampedLock();
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return null;
+    }
+
+    @Override
+    public CamelCluster getCluster() {
+        return  this.cluster;
+    }
+
+    @Override
+    public String getNamespace() {
+        return this.namespace;
+    }
+
+    @Override
+    public void addEventListener(BiConsumer<Event, Object> consumer) {
+        long stamp = lock.writeLock();
+
+        try {
+            consumers.add(new FilteringConsumer(e -> true, consumer));
+        } finally {
+            lock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void addEventListener(Predicate<Event> predicate, BiConsumer<Event, Object>
consumer) {
+        long stamp = lock.writeLock();
+
+        try {
+            consumers.add(new FilteringConsumer(predicate, consumer));
+        } finally {
+            lock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void removeEventListener(BiConsumer<Event, Object> consumer) {
+        long stamp = lock.writeLock();
+
+        try {
+            consumers.removeIf(c -> c.getConsumer().equals(consumer));
+        } finally {
+            lock.unlockWrite(stamp);
+        }
+    }
+
+    // **************************************
+    // Events
+    // **************************************
+
+    protected void fireEvent(CamelClusterView.Event event, Object payload) {
+        long stamp = lock.readLock();
+
+        try {
+            for (int i = 0; i < consumers.size(); i++) {
+                consumers.get(i).accept(event, payload);
+            }
+        } finally {
+            lock.unlockRead(stamp);
+        }
+    }
+
+    // **************************************
+    // Helpers
+    // **************************************
+
+    private final class FilteringConsumer implements BiConsumer<Event, Object> {
+        private final Predicate<Event> predicate;
+        private final BiConsumer<Event, Object> consumer;
+
+        FilteringConsumer(Predicate<Event> predicate,  BiConsumer<Event, Object>
consumer) {
+            this.predicate = predicate;
+            this.consumer = consumer;
+        }
+
+        @Override
+        public void accept(CamelClusterView.Event event, Object payload) {
+            if (predicate.test(event)) {
+                consumer.accept(event, payload);
+            }
+        }
+
+        public BiConsumer<Event, Object> getConsumer() {
+            return this.consumer;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b6f1bdd8/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java
new file mode 100644
index 0000000..4aa1fa2
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.ha;
+
+import java.util.EventObject;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Route;
+import org.apache.camel.StartupListener;
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.ha.CamelCluster;
+import org.apache.camel.ha.CamelClusterHelper;
+import org.apache.camel.ha.CamelClusterView;
+import org.apache.camel.management.event.CamelContextStartedEvent;
+import org.apache.camel.support.EventNotifierSupport;
+import org.apache.camel.support.RoutePolicySupport;
+import org.apache.camel.util.ReferenceCount;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ManagedResource(description = "Clustered Route policy using")
+public final class ClusteredRoutePolicy extends RoutePolicySupport implements CamelContextAware
{
+    private static final Logger LOGGER = LoggerFactory.getLogger(ClusteredRoutePolicy.class);
+
+    private final AtomicBoolean leader;
+    private final Set<Route> startedRoutes;
+    private final Set<Route> stoppedRoutes;
+    private final ReferenceCount refCount;
+    private final CamelClusterView clusterView;
+    private final BiConsumer<CamelClusterView.Event, Object> leadershipEventConsumer;
+    private final CamelContextStartupListener listener;
+    private final AtomicBoolean contextStarted;
+    private CamelContext camelContext;
+
+    public ClusteredRoutePolicy(CamelClusterView clusterView) {
+        this.clusterView = clusterView;
+        this.leadershipEventConsumer = this::onLeadershipEvent;
+
+        this.stoppedRoutes = new HashSet<>();
+        this.startedRoutes = new HashSet<>();
+        this.leader = new AtomicBoolean(false);
+        this.contextStarted = new AtomicBoolean(false);
+
+        try {
+            this.listener = new CamelContextStartupListener();
+            this.listener.start();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        // Cleanup the policy when all the routes it manages have been shut down
+        // so it can be shared among routes.
+        this.refCount = ReferenceCount.onRelease(() -> {
+            if (camelContext != null) {
+                camelContext.getManagementStrategy().removeEventNotifier(listener);
+            }
+
+            clusterView.removeEventListener(leadershipEventConsumer);
+            setLeader(false);
+        });
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        if (this.camelContext == camelContext) {
+            return;
+        }
+
+        if (this.camelContext != null && this.camelContext != camelContext) {
+            throw new IllegalStateException(
+                "CamelContext should not be changed: current=" + this.camelContext + ", new="
+ camelContext
+            );
+        }
+
+        try {
+            this.camelContext = camelContext;
+            this.camelContext.addStartupListener(this.listener);
+            this.camelContext.getManagementStrategy().addEventNotifier(this.listener);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void onInit(Route route) {
+        super.onInit(route);
+
+        LOGGER.info("Route managed by {}. Setting route {} AutoStartup flag to false.", getClass(),
route.getId());
+        route.getRouteContext().getRoute().setAutoStartup("false");
+
+        this.refCount.retain();
+        this.stoppedRoutes.add(route);
+
+        startManagedRoutes();
+    }
+
+    @Override
+    public void doShutdown() {
+        this.refCount.release();
+    }
+
+    // ****************************************************
+    // Management
+    // ****************************************************
+
+    @ManagedAttribute(description = "Is this route the master or a slave")
+    public boolean isLeader() {
+        return leader.get();
+    }
+
+    // ****************************************************
+    // Route managements
+    // ****************************************************
+
+    private synchronized void setLeader(boolean isLeader) {
+        if (isLeader && leader.compareAndSet(false, isLeader)) {
+            LOGGER.debug("Leadership taken");
+            startManagedRoutes();
+        } else if (!isLeader && leader.getAndSet(isLeader)) {
+            LOGGER.debug("Leadership lost");
+            stopManagedRoutes();
+        }
+    }
+
+    private void startManagedRoutes() {
+        if (isLeader()) {
+            doStartManagedRoutes();
+        } else {
+            // If the leadership has been lost in the meanwhile, stop any
+            // eventually started route
+            doStopManagedRoutes();
+        }
+    }
+
+    private void doStartManagedRoutes() {
+        try {
+            for (Route route : stoppedRoutes) {
+                LOGGER.debug("Starting route {}", route.getId());
+                camelContext.startRoute(route.getId());
+                startedRoutes.add(route);
+            }
+
+            stoppedRoutes.removeAll(startedRoutes);
+        } catch (Exception e) {
+            handleException(e);
+        }
+    }
+
+    private void stopManagedRoutes() {
+        if (isLeader()) {
+            // If became a leader in the meanwhile, start any eventually stopped
+            // route
+            doStartManagedRoutes();
+        } else {
+            doStopManagedRoutes();
+        }
+    }
+
+    private void doStopManagedRoutes() {
+        try {
+            for (Route route : startedRoutes) {
+                LOGGER.debug("Stopping route {}", route.getId());
+                stopRoute(route);
+                stoppedRoutes.add(route);
+            }
+
+            startedRoutes.removeAll(stoppedRoutes);
+        } catch (Exception e) {
+            handleException(e);
+        }
+    }
+
+    // ****************************************************
+    // Event handling
+    // ****************************************************
+
+    private void onLeadershipEvent(CamelClusterView.Event event, Object payload) {
+        setLeader(clusterView.getLocalMember().isMaster());
+    }
+
+    private class CamelContextStartupListener extends EventNotifierSupport implements StartupListener
{
+        @Override
+        public void notify(EventObject event) throws Exception {
+            onCamelContextStarted();
+        }
+
+        @Override
+        public boolean isEnabled(EventObject event) {
+            return event instanceof CamelContextStartedEvent;
+        }
+
+        @Override
+        public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws
Exception {
+            if (alreadyStarted) {
+                // Invoke it only if the context was already started as this
+                // method is not invoked at last event as documented but after
+                // routes warm-up so this is useful for routes deployed after
+                // the camel context has been started-up. For standard routes
+                // configuration the notification of the camel context started
+                // is provided by EventNotifier.
+                //
+                // We should check why this callback is not invoked at latest
+                // stage, or maybe rename it as it is misleading and provide a
+                // better alternative for intercept camel events.
+                onCamelContextStarted();
+            }
+        }
+
+        private void onCamelContextStarted() {
+            // Start managing the routes only when the camel context is started
+            // so start/stop of managed routes do not clash with CamelContext
+            // startup
+            if (contextStarted.compareAndSet(false, true)) {
+                clusterView.addEventListener(CamelClusterHelper.leadershipEventFilter(),
leadershipEventConsumer);
+                setLeader(clusterView.getLocalMember().isMaster());
+            }
+        }
+    }
+
+    // ****************************************************
+    // Static helpers
+    // ****************************************************
+
+    public static ClusteredRoutePolicy forNamespace(CamelCluster cluster, String namespace)
throws Exception {
+        return new ClusteredRoutePolicy(cluster.createView(namespace));
+    }
+
+    public static ClusteredRoutePolicy forView(CamelClusterView view) throws Exception  {
+        return new ClusteredRoutePolicy(view);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b6f1bdd8/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterMember.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterMember.java
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterMember.java
deleted file mode 100644
index 6268931..0000000
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterMember.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.atomix.ha;
-
-import io.atomix.group.DistributedGroup;
-import io.atomix.group.GroupMember;
-import org.apache.camel.ha.CamelClusterMember;
-
-class AtomixClusterMember<M extends GroupMember> implements CamelClusterMember {
-    private final DistributedGroup group;
-    private final M member;
-
-    AtomixClusterMember(DistributedGroup group, M member) {
-        this.group = group;
-        this.member = member;
-    }
-
-    @Override
-    public String getId() {
-        return member.id();
-    }
-
-    @Override
-    public boolean isMaster() {
-        return group.election().term().leader().equals(member);
-    }
-
-    M getGroupMember() {
-      return member;
-    }
-}


Mime
View raw message