camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject camel git commit: CAMEL-10426 - Added CuratorMultiMasterLeaderRoutePolicy
Date Wed, 09 Nov 2016 21:31:30 GMT
Repository: camel
Updated Branches:
  refs/heads/master 33063a86c -> 34fdcb3ca


CAMEL-10426 - Added CuratorMultiMasterLeaderRoutePolicy


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/34fdcb3c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/34fdcb3c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/34fdcb3c

Branch: refs/heads/master
Commit: 34fdcb3ca67aebad9af2a05ee737f32f8d5f5982
Parents: 33063a8
Author: Paolo Antinori <pantinor@redhat.com>
Authored: Wed Nov 9 15:07:32 2016 +0100
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Wed Nov 9 22:24:35 2016 +0100

----------------------------------------------------------------------
 components/camel-zookeeper/pom.xml              |  16 +
 .../src/main/docs/zookeeper-component.adoc      |   9 +-
 .../CuratorMultiMasterLeaderElection.java       | 171 +++++++
 .../CuratorMultiMasterLeaderRoutePolicy.java    | 191 ++++++++
 ...MultiMasterCuratorLeaderRoutePolicyTest.java | 460 +++++++++++++++++++
 .../src/test/resources/log4j2.properties        |  10 +-
 6 files changed, 854 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/34fdcb3c/components/camel-zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/pom.xml b/components/camel-zookeeper/pom.xml
index 2021d2c..08dc904 100644
--- a/components/camel-zookeeper/pom.xml
+++ b/components/camel-zookeeper/pom.xml
@@ -105,4 +105,20 @@
 
   </dependencies>
 
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <childDelegation>false</childDelegation>
+          <useFile>true</useFile>
+          <forkCount>1</forkCount>
+          <reuseForks>true</reuseForks>
+          <forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+
 </project>

http://git-wip-us.apache.org/repos/asf/camel/blob/34fdcb3c/components/camel-zookeeper/src/main/docs/zookeeper-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/docs/zookeeper-component.adoc b/components/camel-zookeeper/src/main/docs/zookeeper-component.adoc
index e569ea2..5f8d5cf 100644
--- a/components/camel-zookeeper/src/main/docs/zookeeper-component.adoc
+++ b/components/camel-zookeeper/src/main/docs/zookeeper-component.adoc
@@ -269,18 +269,23 @@ from("direct:policy-controlled")
     .to("mock:controlled");
 ----
 
-There are currently 2 policies defined in the component, with different SLAs:
+There are currently 3 policies defined in the component, with different SLAs:
 
 * `ZooKeeperRoutePolicy`
 * `CuratorLeaderRoutePolicy` (since *2.19*)
+* `MultiMasterCuratorLeaderRoutePolicy` (since *2.19*)
 
 *ZooKeeperRoutePolicy* supports multiple active nodes, but it's activation kicks in only
after a Camel component and its correspondent Consumer have already been started,
  this introduces, depending on your routes definition, the risk that you component can already
start consuming events and producing `Exchange`s, before the policy could estabilish
  that the node should not be activated.
- 
+
 *CuratorLeaderRoutePolicy* supports only a single active node, but it's bound to a different
`CamelContext` lifecycle method; this Policy kicks in before any route or consumer is started
  thus you can be sure that no even is processed before the Policy takes its decision.
 
+*MultiMasterCuratorLeaderRoutePolicy* support multiple active nodes, and it's bound to the
same lifecycle method as `CuratorLeaderRoutePolicy`; this Policy kicks in before any route
or consumer is started
+ thus you can be sure that no even is processed before the Policy takes its decision.
+
+
 [[Zookeeper-SeeAlso]]
 See Also
 ^^^^^^^^

http://git-wip-us.apache.org/repos/asf/camel/blob/34fdcb3c/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderElection.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderElection.java
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderElection.java
new file mode 100644
index 0000000..9c91b73
--- /dev/null
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderElection.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.policy;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.StatefulService;
+import org.apache.camel.impl.JavaUuidGenerator;
+import org.apache.camel.spi.UuidGenerator;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
+import org.apache.curator.framework.recipes.locks.Lease;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>CuratorMultiMasterLeaderElection</code> uses the leader election capabilities
of a
+ * ZooKeeper cluster to control which nodes are enabled. It is typically used in
+ * fail-over scenarios controlling identical instances of an application across
+ * a cluster of Camel based servers. <p> The election is configured providing the number
of instances that are required
+ * to be active..
+ * <p> All instances of the election must also be configured with the same path on
the ZooKeeper
+ * cluster where the election will be carried out. It is good practice for this
+ * to indicate the application e.g. <tt>/someapplication/someroute/</tt> note
+ * that these nodes should exist before using the election. <p> See <a
+ * href="http://hadoop.apache.org/zookeeper/docs/current/recipes.html#sc_leaderElection">
+ * for more on how Leader election</a> is archived with ZooKeeper.
+ */
+public class CuratorMultiMasterLeaderElection implements ConnectionStateListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CuratorMultiMasterLeaderElection.class);
+
+    private final String candidateName;
+    private final List<ElectionWatcher> watchers = new ArrayList<ElectionWatcher>();
+    private final int desiredActiveNodes;
+    private AtomicBoolean activeNode = new AtomicBoolean(false);
+    private UuidGenerator uuidGenerator = new JavaUuidGenerator();
+    private InterProcessSemaphoreV2 leaderSelector;
+    private CuratorFramework client;
+    private Lease lease;
+
+    public CuratorMultiMasterLeaderElection(String uri, int desiredActiveNodes) {
+        this.candidateName = createCandidateName();
+        this.desiredActiveNodes = desiredActiveNodes;
+
+        String connectionString = uri.substring(1 + uri.indexOf(':')).split("/")[0];
+        String protocol = uri.substring(0, uri.indexOf(':'));
+        String path = uri.replace(protocol + ":" + connectionString, "");
+        client = CuratorFrameworkFactory.newClient(connectionString, new ExponentialBackoffRetry(1000,
3));
+        client.getConnectionStateListenable().addListener(this);
+        leaderSelector = new InterProcessSemaphoreV2(client, path, this.desiredActiveNodes);
+        client.start();
+
+
+    }
+
+    // stolen from org/apache/camel/processor/CamelInternalProcessor
+    public static boolean isCamelStopping(CamelContext context) {
+        if (context instanceof StatefulService) {
+            StatefulService ss = (StatefulService) context;
+            return ss.isStopping() || ss.isStopped();
+        }
+        return false;
+    }
+
+    public void shutdownClients() {
+        try {
+            leaderSelector.returnLease(lease);
+        } finally {
+            client.close();
+        }
+    }
+
+    /*
+     * Blocking method
+     */
+    public void requestResource() {
+        LOG.info("Requested to become active from {}", candidateName);
+        try {
+            lease = leaderSelector.acquire();
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to obtain access to become a leader node.");
+        }
+        LOG.info("{} is now active", candidateName);
+        activeNode.set(true);
+        notifyElectionWatchers();
+    }
+
+    public boolean isMaster() {
+        return activeNode.get();
+    }
+
+    private String createCandidateName() {
+        StringBuilder builder = new StringBuilder();
+        try {
+            /* UUID would be enough, also using hostname for human readability */
+            builder.append(InetAddress.getLocalHost().getCanonicalHostName());
+        } catch (UnknownHostException ex) {
+            LOG.warn("Failed to get the local hostname.", ex);
+            builder.append("unknown-host");
+        }
+        builder.append("-").append(uuidGenerator.generateUuid());
+        return builder.toString();
+    }
+
+    public String getCandidateName() {
+        return candidateName;
+    }
+
+    private void notifyElectionWatchers() {
+        for (ElectionWatcher watcher : watchers) {
+            try {
+                watcher.electionResultChanged();
+            } catch (Exception e) {
+                LOG.warn("Election watcher " + watcher + " of type " + watcher.getClass()
+ " threw an exception.", e);
+            }
+        }
+    }
+
+    public boolean addElectionWatcher(ElectionWatcher e) {
+        return watchers.add(e);
+    }
+
+    @Override
+    public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState)
{
+        switch (connectionState) {
+        case SUSPENDED:
+        case LOST:
+            LOG.info("Received {} state from connection. Giving up lock.", connectionState);
+
+            try {
+                leaderSelector.returnLease(lease);
+            } finally {
+                this.activeNode.set(false);
+                notifyElectionWatchers();
+            }
+
+            break;
+        default:
+            LOG.info("Connection state changed: {}", connectionState);
+            requestResource();
+
+        }
+    }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/camel/blob/34fdcb3c/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderRoutePolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderRoutePolicy.java
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderRoutePolicy.java
new file mode 100644
index 0000000..b220ee4
--- /dev/null
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderRoutePolicy.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.policy;
+
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.camel.NonManagedService;
+import org.apache.camel.Route;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.RoutePolicySupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+/**
+ * <code>CuratorMultiMasterLeaderRoutePolicy</code> uses Apache Curator InterProcessSemaphoreV2
receipe to implement the behavior of having
+ * at multiple active instances of  a route, controlled by a specific policy, running. It
is typically used in
+ * fail-over scenarios controlling identical instances of a route across a cluster of Camel
based servers.
+ * <p>
+ * The policy affects the normal startup lifecycle of CamelContext and Routes, automatically
set autoStart property of
+ * routes controlled by this policy to false.
+ * After Curator receipe identifies the current Policy instance as the Leader between a set
of clients that are
+ * competing for the role, it will start the route, and only at that moment the route will
start its business.
+ * This specific behavior is designed to avoid scenarios where such a policy would kick in
only after a route had
+ * already been started, with the risk, for consumers for example, that some source event
might have already been
+ * consumed.
+ * <p>
+ * All instances of the policy must also be configured with the same path on the
+ * ZooKeeper cluster where the election will be carried out. It is good practice
+ * for this to indicate the application e.g. <tt>/someapplication/someroute/</tt>
note
+ * that these nodes should exist before using the policy.
+ * <p>
+ * See <a href="http://hadoop.apache.org/zookeeper/docs/current/recipes.html#sc_leaderElection">
+ *     for more on how Leader election</a> is archived with ZooKeeper.
+ */
+public class CuratorMultiMasterLeaderRoutePolicy extends RoutePolicySupport implements ElectionWatcher,
NonManagedService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CuratorMultiMasterLeaderRoutePolicy.class);
+    private final String uri;
+    private final Lock lock = new ReentrantLock();
+    private final Set<Route> suspendedRoutes = new CopyOnWriteArraySet<Route>();
+    private final AtomicBoolean shouldProcessExchanges = new AtomicBoolean();
+    private volatile boolean shouldStopRoute = true;
+    private final int enabledCount;
+
+
+    private final Lock electionLock = new ReentrantLock();
+
+    private CuratorMultiMasterLeaderElection election;
+
+    public CuratorMultiMasterLeaderRoutePolicy(String uri, int enabledCount) {
+        this.uri = uri;
+        this.enabledCount = enabledCount;
+    }
+    public CuratorMultiMasterLeaderRoutePolicy(String uri) {
+        this(uri, 1);
+    }
+
+    @Override
+    public void onInit(Route route) {
+        ensureElectionIsCreated();
+        LOG.info("Route managed by {}. Setting route [{}] AutoStartup flag to false.", this.getClass(),
route.getId());
+        route.getRouteContext().getRoute().setAutoStartup("false");
+
+
+        if (election.isMaster()) {
+            if (shouldStopRoute) {
+                startManagedRoute(route);
+            }
+        } else {
+            if (shouldStopRoute) {
+                stopManagedRoute(route);
+            }
+        }
+
+    }
+
+    private void ensureElectionIsCreated() {
+        if (election == null) {
+            electionLock.lock();
+            try {
+                if (election == null) { // re-test
+                    election = new CuratorMultiMasterLeaderElection(uri, enabledCount);
+                    election.addElectionWatcher(this);
+
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            } finally {
+                electionLock.unlock();
+            }
+        }
+    }
+
+    private void startManagedRoute(Route route) {
+        try {
+            lock.lock();
+            if (suspendedRoutes.contains(route)) {
+                startRoute(route);
+                suspendedRoutes.remove(route);
+            }
+        } catch (Exception e) {
+            handleException(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void stopManagedRoute(Route route) {
+        try {
+            lock.lock();
+            // check that we should still suspend once the lock is acquired
+            if (!suspendedRoutes.contains(route) && !shouldProcessExchanges.get())
{
+                stopRoute(route);
+                suspendedRoutes.add(route);
+            }
+        } catch (Exception e) {
+            handleException(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void electionResultChanged() {
+        if (election.isMaster()) {
+            startAllStoppedRoutes();
+        }
+    }
+
+    private void startAllStoppedRoutes() {
+        try {
+            lock.lock();
+
+            if (!suspendedRoutes.isEmpty()) {
+                if (log.isDebugEnabled()) {
+                    log.info("{} route(s) have been stopped previously by policy, restarting.",
suspendedRoutes.size());
+                }
+                for (Route suspended : suspendedRoutes) {
+                    DefaultCamelContext ctx = (DefaultCamelContext)suspended.getRouteContext().getCamelContext();
+                    while (!ctx.isStarted()) {
+                        log.info("Context {} is not started yet. Sleeping for a bit.", ctx.getName());
+                        Thread.sleep(5000);
+                    }
+                    log.info("Starting route [{}] defined in context [{}].", suspended.getId(),
ctx.getName());
+                    startRoute(suspended);
+                }
+                suspendedRoutes.clear();
+            }
+
+        } catch (Exception e) {
+            handleException(e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        try {
+            electionLock.lock();
+            election.shutdownClients();
+            election = null;
+        } finally {
+            electionLock.unlock();
+        }
+    }
+
+    public CuratorMultiMasterLeaderElection getElection() {
+        return election;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/34fdcb3c/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/MultiMasterCuratorLeaderRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/MultiMasterCuratorLeaderRoutePolicyTest.java
b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/MultiMasterCuratorLeaderRoutePolicyTest.java
new file mode 100644
index 0000000..a6c9946
--- /dev/null
+++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/MultiMasterCuratorLeaderRoutePolicyTest.java
@@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.policy;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+
+public class MultiMasterCuratorLeaderRoutePolicyTest extends ZooKeeperTestSupport {
+    public static final String ZNODE = "/multimaster";
+    public static final String BASE_ZNODE = "/someapp";
+    private static final Logger LOG = LoggerFactory.getLogger(MultiMasterCuratorLeaderRoutePolicyTest.class);
+
+
+    protected CamelContext createCamelContext() throws Exception {
+        disableJMX();
+        return super.createCamelContext();
+    }
+
+
+    @Test
+    public void ensureRoutesDoNotStartAutomatically() throws Exception {
+        DefaultCamelContext context = new DefaultCamelContext();
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                CuratorMultiMasterLeaderRoutePolicy policy = new CuratorMultiMasterLeaderRoutePolicy("zookeeper:localhost:"
+ getServerPort() + BASE_ZNODE + ZNODE + 2);
+                from("timer://foo?fixedRate=true&period=5").routePolicy(policy).id("single_route").autoStartup(true).to("mock:controlled");
+            }
+        });
+        context.start();
+        // this check verifies that a route marked as autostartable is not started automatically.
It will be the policy responsibility to eventually start it.
+        assertThat(context.getRouteStatus("single_route").isStarted(), is(false));
+        assertThat(context.getRouteStatus("single_route").isStarting(), is(false));
+        try {
+            context.shutdown();
+        } catch (Exception e) {
+            //concurrency can raise some InterruptedException but we don't really care in
this scenario.
+        }
+    }
+
+    @Test
+    public void oneMasterOneSlaveScenarioContolledByPolicy() throws Exception {
+        final String path = "oneMasterOneSlaveScenarioContolledByPolicy";
+        final String firstDestination = "first" + System.currentTimeMillis();
+        final String secondDestination = "second" + System.currentTimeMillis();
+        final CountDownLatch waitForSecondRouteCompletedLatch = new CountDownLatch(1);
+        final int activeNodesDesired = 1;
+
+        MultiMasterZookeeperPolicyEnforcedContext first = createEnforcedContext(firstDestination,
activeNodesDesired, path);
+        DefaultCamelContext controlledContext = (DefaultCamelContext) first.controlledContext;
+        // get reference to the Policy object to check if it's already a master
+        CuratorMultiMasterLeaderRoutePolicy routePolicy = (CuratorMultiMasterLeaderRoutePolicy)
controlledContext.getRouteDefinition(firstDestination).getRoutePolicies().get(0);
+
+        assertWeHaveMasters(routePolicy);
+
+        LOG.info("Starting first CamelContext");
+        final MultiMasterZookeeperPolicyEnforcedContext[] arr = new MultiMasterZookeeperPolicyEnforcedContext[1];
+
+        new Thread() {
+            @Override
+            public void run() {
+                MultiMasterZookeeperPolicyEnforcedContext second = null;
+                try {
+                    LOG.info("Starting second CamelContext in a separate thread");
+                    second = createEnforcedContext(secondDestination, activeNodesDesired,
path);
+                    arr[0] = second;
+                    second.sendMessageToEnforcedRoute("message for second", 0);
+                    waitForSecondRouteCompletedLatch.countDown();
+                } catch (Exception e) {
+                    LOG.error("Error in the thread controlling the second context", e);
+                    fail("Error in the thread controlling the second context: " + e.getMessage());
+                }
+
+
+            }
+        }.start();
+
+        first.sendMessageToEnforcedRoute("message for first", 1);
+
+        waitForSecondRouteCompletedLatch.await(2, TimeUnit.MINUTES);
+        LOG.info("Explicitly shutting down the first camel context.");
+
+        LOG.info("Shutting down first con");
+        first.shutdown();
+
+        MultiMasterZookeeperPolicyEnforcedContext second = arr[0];
+
+        DefaultCamelContext secondCamelContext = (DefaultCamelContext) second.controlledContext;
+        assertWeHaveMasters((CuratorMultiMasterLeaderRoutePolicy)secondCamelContext.getRouteDefinition(secondDestination).getRoutePolicies().get(0));
+
+        //second.mock = secondCamelContext.getEndpoint("mock:controlled", MockEndpoint.class);
+        second.sendMessageToEnforcedRoute("message for slave", 1);
+        second.shutdown();
+    }
+
+
+    @Test
+    public void oneMasterOneSlaveAndFlippedAgainScenarioContolledByPolicy() throws Exception
{
+        final String path = "oneMasterOneSlaveScenarioContolledByPolicy";
+        final String firstDestination = "first" + System.currentTimeMillis();
+        final String secondDestination = "second" + System.currentTimeMillis();
+        final CountDownLatch waitForSecondRouteCompletedLatch = new CountDownLatch(1);
+        final int activeNodeDesired = 1;
+
+        MultiMasterZookeeperPolicyEnforcedContext first = createEnforcedContext(firstDestination,
activeNodeDesired, path);
+        DefaultCamelContext controlledContext = (DefaultCamelContext) first.controlledContext;
+        // get reference to the Policy object to check if it's already a master
+        CuratorMultiMasterLeaderRoutePolicy routePolicy = (CuratorMultiMasterLeaderRoutePolicy)
controlledContext.getRouteDefinition(firstDestination).getRoutePolicies().get(0);
+
+        assertWeHaveMasters(routePolicy);
+
+        LOG.info("Starting first CamelContext");
+        final MultiMasterZookeeperPolicyEnforcedContext[] arr = new MultiMasterZookeeperPolicyEnforcedContext[1];
+
+        new Thread() {
+            @Override
+            public void run() {
+                MultiMasterZookeeperPolicyEnforcedContext slave = null;
+                try {
+                    LOG.info("Starting second CamelContext in a separate thread");
+                    slave = createEnforcedContext(secondDestination, activeNodeDesired, path);
+                    arr[0] = slave;
+                    slave.sendMessageToEnforcedRoute("message for second", 0);
+                    waitForSecondRouteCompletedLatch.countDown();
+                } catch (Exception e) {
+                    LOG.error("Error in the thread controlling the second context", e);
+                    fail("Error in the thread controlling the second context: " + e.getMessage());
+                }
+
+
+            }
+        }.start();
+
+        first.sendMessageToEnforcedRoute("message for first", 1);
+
+        waitForSecondRouteCompletedLatch.await(2, TimeUnit.MINUTES);
+        MultiMasterZookeeperPolicyEnforcedContext second = arr[0];
+
+        LOG.info("Explicitly shutting down the first camel context.");
+        first.shutdown();
+
+        DefaultCamelContext secondCamelContext = (DefaultCamelContext) second.controlledContext;
+        assertWeHaveMasters((CuratorMultiMasterLeaderRoutePolicy)secondCamelContext.getRouteDefinition(secondDestination).getRoutePolicies().get(0));
+
+        CountDownLatch restartFirstLatch = new CountDownLatch(1);
+        LOG.info("Start back first context");
+        new Thread() {
+            @Override
+            public void run() {
+                try {
+                    first.startup();
+                    restartFirstLatch.countDown();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }.start();
+        restartFirstLatch.await();
+        second.sendMessageToEnforcedRoute("message for second", 1);
+        first.mock.reset();
+        first.sendMessageToEnforcedRoute("message for first", 0);
+        second.shutdown();
+        controlledContext = (DefaultCamelContext) first.controlledContext;
+        // get reference to the Policy object to check if it's already a master
+        routePolicy = (CuratorMultiMasterLeaderRoutePolicy) controlledContext.getRouteDefinition(firstDestination).getRoutePolicies().get(0);
+        log.info("Asserting route is up. context: [{}]", controlledContext.getName());
+        assertWeHaveMasters(routePolicy);
+        first.controlledContext.setTracing(true);
+        first.mock = controlledContext.getEndpoint("mock:controlled", MockEndpoint.class);
+        first.sendMessageToEnforcedRoute("message for first", 1);
+        first.shutdown();
+    }
+
+
+
+
+    @Test
+    public void oneMasterTwoSlavesScenarioContolledByPolicy() throws Exception {
+        final String path = "oneMasterTwoSlavesScenarioContolledByPolicy";
+        final String master = "master" + System.currentTimeMillis();
+        final String secondDestination = "second" + System.currentTimeMillis();
+        final String thirdDestination = "third" + System.currentTimeMillis();
+        final CountDownLatch waitForNonActiveRoutesLatch = new CountDownLatch(2);
+        final int activeNodesDesired = 1;
+
+        LOG.info("Starting first CamelContext");
+        MultiMasterZookeeperPolicyEnforcedContext first = createEnforcedContext(master, activeNodesDesired,
path);
+        DefaultCamelContext controlledContext = (DefaultCamelContext) first.controlledContext;
+        // get reference to the Policy object to check if it's already a master
+        CuratorMultiMasterLeaderRoutePolicy routePolicy = (CuratorMultiMasterLeaderRoutePolicy)
controlledContext.getRouteDefinition(master).getRoutePolicies().get(0);
+
+        assertWeHaveMasters(routePolicy);
+
+        final MultiMasterZookeeperPolicyEnforcedContext[] arr = new MultiMasterZookeeperPolicyEnforcedContext[2];
+
+        new Thread() {
+            @Override
+            public void run() {
+                MultiMasterZookeeperPolicyEnforcedContext second = null;
+                try {
+                    LOG.info("Starting second CamelContext");
+                    second = createEnforcedContext(secondDestination, activeNodesDesired,
path);
+                    arr[0] = second;
+                    second.sendMessageToEnforcedRoute("message for second", 0);
+                    waitForNonActiveRoutesLatch.countDown();
+                } catch (Exception e) {
+                    LOG.error("Error in the thread controlling the second context", e);
+                    fail("Error in the thread controlling the second context: " + e.getMessage());
+                }
+
+
+            }
+        }.start();
+
+        new Thread() {
+            @Override
+            public void run() {
+                MultiMasterZookeeperPolicyEnforcedContext third = null;
+                try {
+                    LOG.info("Starting third CamelContext");
+                    third = createEnforcedContext(thirdDestination, activeNodesDesired, path);
+                    arr[1] = third;
+                    third.sendMessageToEnforcedRoute("message for third", 0);
+                    waitForNonActiveRoutesLatch.countDown();
+                } catch (Exception e) {
+                    LOG.error("Error in the thread controlling the third context", e);
+                    fail("Error in the thread controlling the third context: " + e.getMessage());
+                }
+
+
+            }
+        }.start();
+
+        // Send messages to the master and the slave.
+        // The route is enabled in the master and gets through, but that sent to
+        // the slave context is rejected.
+        first.sendMessageToEnforcedRoute("message for master", 1);
+
+        waitForNonActiveRoutesLatch.await();
+        LOG.info("Explicitly shutting down the first camel context.");
+        // trigger failover by killing the master..
+        first.shutdown();
+        // let's find out who's active now:
+
+        CuratorMultiMasterLeaderRoutePolicy routePolicySecond = (CuratorMultiMasterLeaderRoutePolicy)
arr[0].controlledContext.getRouteDefinition(secondDestination).getRoutePolicies().get(0);
+        CuratorMultiMasterLeaderRoutePolicy routePolicyThird = (CuratorMultiMasterLeaderRoutePolicy)
arr[1].controlledContext.getRouteDefinition(thirdDestination).getRoutePolicies().get(0);
+
+        MultiMasterZookeeperPolicyEnforcedContext newMaster = null;
+        MultiMasterZookeeperPolicyEnforcedContext slave = null;
+
+        final int maxWait = 20;
+        for (int i = 0; i < maxWait; i++) {
+            if (routePolicySecond.getElection().isMaster()) {
+                newMaster = arr[0];
+                slave = arr[1];
+                LOG.info("[second] is the new master");
+                break;
+            } else if (routePolicyThird.getElection().isMaster()) {
+                newMaster = arr[1];
+                slave = arr[0];
+                LOG.info("[third] is the new master");
+                break;
+            } else {
+                Thread.sleep(2000);
+                LOG.info("waiting for a new master to be elected");
+            }
+        }
+        assertThat(newMaster, is(notNullValue()));
+
+        newMaster.sendMessageToEnforcedRoute("message for second", 1);
+        slave.sendMessageToEnforcedRoute("message for third", 0);
+        slave.shutdown();
+        newMaster.shutdown();
+    }
+
+
+    @Test
+    public void twoMasterOneSlavesScenarioContolledByPolicy() throws Exception {
+        final String path = "twoMasterOneSlavesScenarioContolledByPolicy";
+        final String firstDestination = "first" + System.currentTimeMillis();
+        final String secondDestination = "second" + System.currentTimeMillis();
+        final String thirdDestination = "third" + System.currentTimeMillis();
+        final CountDownLatch waitForThirdRouteCompletedLatch = new CountDownLatch(1);
+        final int activeNodeDesired = 2;
+
+        MultiMasterZookeeperPolicyEnforcedContext first = createEnforcedContext(firstDestination,
activeNodeDesired, path);
+        DefaultCamelContext firstControlledContext = (DefaultCamelContext) first.controlledContext;
+        CuratorMultiMasterLeaderRoutePolicy firstRoutePolicy = (CuratorMultiMasterLeaderRoutePolicy)
firstControlledContext.getRouteDefinition(firstDestination).getRoutePolicies().get(0);
+
+        MultiMasterZookeeperPolicyEnforcedContext second = createEnforcedContext(secondDestination,
activeNodeDesired, path);
+        DefaultCamelContext secondControlledContext = (DefaultCamelContext) second.controlledContext;
+        CuratorMultiMasterLeaderRoutePolicy secondRoutePolicy = (CuratorMultiMasterLeaderRoutePolicy)
secondControlledContext.getRouteDefinition(secondDestination).getRoutePolicies().get(0);
+
+        assertWeHaveMasters(firstRoutePolicy, secondRoutePolicy);
+
+        final MultiMasterZookeeperPolicyEnforcedContext[] arr = new MultiMasterZookeeperPolicyEnforcedContext[1];
+
+
+        new Thread() {
+            @Override
+            public void run() {
+                MultiMasterZookeeperPolicyEnforcedContext third = null;
+                try {
+                    LOG.info("Starting third CamelContext");
+                    third = createEnforcedContext(thirdDestination, activeNodeDesired, path);
+                    arr[0] = third;
+                    third.sendMessageToEnforcedRoute("message for third", 0);
+                    waitForThirdRouteCompletedLatch.countDown();
+                } catch (Exception e) {
+                    LOG.error("Error in the thread controlling the third context", e);
+                    fail("Error in the thread controlling the third context: " + e.getMessage());
+                }
+
+
+            }
+        }.start();
+
+        first.sendMessageToEnforcedRoute("message for first", 1);
+        second.sendMessageToEnforcedRoute("message for second", 1);
+
+
+        waitForThirdRouteCompletedLatch.await();
+
+        LOG.info("Explicitly shutting down the first camel context.");
+        first.shutdown();
+
+
+        arr[0].sendMessageToEnforcedRoute("message for third", 1);
+        second.shutdown();
+        arr[0].shutdown();
+    }
+
+    void assertWeHaveMasters(CuratorMultiMasterLeaderRoutePolicy... routePolicies) throws
InterruptedException {
+        final int maxWait = 20;
+        boolean global = false;
+        for (int i = 0; i < maxWait; i++) {
+            boolean iteration = true;
+            for (CuratorMultiMasterLeaderRoutePolicy policy : routePolicies) {
+                log.info("Policy: {}, master: {}", policy, policy.getElection().isMaster());
+                iteration = iteration & policy.getElection().isMaster();
+            }
+            if (iteration) {
+                LOG.info("the number of required active routes is available");
+                global = true;
+                break;
+            } else {
+                Thread.sleep(2000);
+                LOG.info("waiting routes to become leader and be activated.");
+            }
+        }
+        if (!global) {
+            fail("The expected number of route never became master");
+        }
+    }
+
+
+    private class MultiMasterZookeeperPolicyEnforcedContext {
+        CamelContext controlledContext;
+        ProducerTemplate template;
+        MockEndpoint mock;
+        String routename;
+        String path;
+
+        MultiMasterZookeeperPolicyEnforcedContext(String name, int activeNodesDesired, String
path) throws Exception {
+            controlledContext = new DefaultCamelContext();
+            routename = name;
+            this.path = path;
+            template = controlledContext.createProducerTemplate();
+            mock = controlledContext.getEndpoint("mock:controlled", MockEndpoint.class);
+            controlledContext.addRoutes(new FailoverRoute(name, activeNodesDesired, path));
+            controlledContext.start();
+        }
+
+        public void sendMessageToEnforcedRoute(String message, int expected) throws InterruptedException
{
+            mock.expectedMessageCount(expected);
+            try {
+                LOG.info("Sending message to: {}", "vm:" + routename);
+                template.sendBody("vm:" + routename, ExchangePattern.InOut, message);
+            } catch (Exception e) {
+                if (expected > 0) {
+                    LOG.error(e.getMessage(), e);
+                    fail("Expected messages...");
+                }
+            }
+            mock.await(2, TimeUnit.SECONDS);
+            mock.assertIsSatisfied(2000);
+        }
+
+        public void shutdown() throws Exception {
+            LogFactory.getLog(getClass()).debug("stopping");
+            controlledContext.stop();
+            LogFactory.getLog(getClass()).debug("stopped");
+        }
+
+
+        public void startup() throws Exception {
+            LogFactory.getLog(getClass()).debug("starting");
+            controlledContext.start();
+            LogFactory.getLog(getClass()).debug("started");
+        }
+    }
+
+    private MultiMasterZookeeperPolicyEnforcedContext createEnforcedContext(String name,
int activeNodesDesired, String path) throws Exception, InterruptedException {
+        MultiMasterZookeeperPolicyEnforcedContext context = new MultiMasterZookeeperPolicyEnforcedContext(name,
activeNodesDesired, path);
+        delay(1000);
+        return context;
+    }
+
+    public class FailoverRoute extends RouteBuilder {
+
+        private String path;
+        private String routename;
+        private int activeNodesDesired;
+
+        public FailoverRoute(String routename, int activeNodesDesired, String path) {
+            // need names as if we use the same direct ep name in two contexts
+            // in the same vm shutting down one context shuts the endpoint for
+            // both.
+            this.routename = routename;
+            this.activeNodesDesired = activeNodesDesired;
+            this.path = path;
+        }
+
+        public void configure() throws Exception {
+            CuratorMultiMasterLeaderRoutePolicy policy = new CuratorMultiMasterLeaderRoutePolicy("zookeeper:localhost:"
+ getServerPort() + BASE_ZNODE + ZNODE + "/" + path, this.activeNodesDesired);
+            from("vm:" + routename).routePolicy(policy).id(routename).to("mock:controlled");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/34fdcb3c/components/camel-zookeeper/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/test/resources/log4j2.properties b/components/camel-zookeeper/src/test/resources/log4j2.properties
index 81447cc..7536ead 100644
--- a/components/camel-zookeeper/src/test/resources/log4j2.properties
+++ b/components/camel-zookeeper/src/test/resources/log4j2.properties
@@ -23,18 +23,26 @@ appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
 appender.out.type = Console
 appender.out.name = out
 appender.out.layout.type = PatternLayout
+# appender.out.layout.pattern = %highlight{%d [%t] %-5level: %msg%n%throwable}{FATAL=red,
ERROR=red, WARN=blue, INFO=black, DEBUG=grey, TRACE=blue}
 appender.out.layout.pattern = [%t] %c{1} %-5p %m%n
+
 logger.zookeeper.name = org.apache.zookeeper
 logger.zookeeper.level = INFO
 logger.camel-zookeeper.name = org.apache.camel.component.zookeeper
 logger.camel-zookeeper.level = INFO
+logger.camel-zookeeper-policy.name = org.apache.camel.component.zookeeper.policy
+logger.camel-zookeeper-policy.level = INFO
 logger.camel-support.name = org.apache.camel.support
 logger.camel-support.level = INFO
+logger.camel.name = org.apache.camel
+logger.camel.level = INFO
+
+
 
 logger.springframework.name = org.springframework
 logger.springframework.level = WARN
 rootLogger.level = INFO
-# rootLogger.appenderRef.stdout.ref = out
+#rootLogger.appenderRef.stdout.ref = out
 rootLogger.appenderRef.file.ref = file
 
 


Mime
View raw message