camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nferr...@apache.org
Subject [06/14] camel git commit: CAMEL-11331: Lease based implementation of Kubernetes lock
Date Tue, 08 Aug 2017 14:43:24 GMT
CAMEL-11331: Lease based implementation of Kubernetes lock


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e4cab329
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e4cab329
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e4cab329

Branch: refs/heads/master
Commit: e4cab32911c2d825568d1de93fbf42ccbc341c92
Parents: 4548126
Author: Nicola Ferraro <ni.ferraro@gmail.com>
Authored: Fri Jul 7 17:05:31 2017 +0200
Committer: Nicola Ferraro <ni.ferraro@gmail.com>
Committed: Tue Aug 8 16:39:43 2017 +0200

----------------------------------------------------------------------
 .../kubernetes/ha/KubernetesClusterService.java | 117 +++++-
 .../kubernetes/ha/KubernetesClusterView.java    |   6 +-
 .../kubernetes/ha/lock/ConfigMapLockUtils.java  | 106 ++++++
 .../ha/lock/KubernetesLeaderMonitor.java        | 256 -------------
 .../ha/lock/KubernetesLeadershipController.java | 211 -----------
 ...ubernetesLeaseBasedLeadershipController.java | 374 +++++++++++++++++++
 .../ha/lock/KubernetesLockConfiguration.java    |  99 ++++-
 .../ha/lock/KubernetesMembersMonitor.java       |   4 +-
 .../kubernetes/ha/lock/LeaderInfo.java          |  90 +++++
 9 files changed, 767 insertions(+), 496 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
index 6d87d48..a868d16 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
@@ -19,6 +19,8 @@ package org.apache.camel.component.kubernetes.ha;
 import java.net.InetAddress;
 import java.util.Map;
 
+import io.fabric8.kubernetes.client.KubernetesClient;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.kubernetes.KubernetesConfiguration;
@@ -31,8 +33,6 @@ import org.apache.camel.util.ObjectHelper;
  */
 public class KubernetesClusterService extends AbstractCamelClusterService<KubernetesClusterView> {
 
-    public static final String DEFAULT_CONFIGMAP_NAME = "leaders";
-
     private KubernetesConfiguration configuration;
 
     private KubernetesLockConfiguration lockConfiguration;
@@ -64,10 +64,7 @@ public class KubernetesClusterService extends AbstractCamelClusterService<Kubern
 
         config.setGroupName(ObjectHelper.notNull(groupName, "groupName"));
 
-        // Check defaults (Namespace and podName can be null)
-        if (config.getConfigMapName() == null) {
-            config.setConfigMapName(DEFAULT_CONFIGMAP_NAME);
-        }
+        // Determine the pod name if not provided
         if (config.getPodName() == null) {
             config.setPodName(System.getenv("HOSTNAME"));
             if (config.getPodName() == null) {
@@ -79,6 +76,33 @@ public class KubernetesClusterService extends AbstractCamelClusterService<Kubern
             }
         }
 
+        ObjectHelper.notNull(config.getConfigMapName(), "configMapName");
+        ObjectHelper.notNull(config.getClusterLabels(), "clusterLabels");
+
+        if (config.getJitterFactor() < 1) {
+            throw new IllegalStateException("jitterFactor must be >= 1 (found: " + config.getJitterFactor() + ")");
+        }
+        if (config.getRetryOnErrorIntervalSeconds() <= 0) {
+            throw new IllegalStateException("retryOnErrorIntervalSeconds must be > 0 (found: " + config.getRetryOnErrorIntervalSeconds() + ")");
+        }
+        if (config.getRetryPeriodSeconds() <= 0) {
+            throw new IllegalStateException("retryPeriodSeconds must be > 0 (found: " + config.getRetryPeriodSeconds() + ")");
+        }
+        if (config.getRenewDeadlineSeconds() <= 0) {
+            throw new IllegalStateException("renewDeadlineSeconds must be > 0 (found: " + config.getRenewDeadlineSeconds() + ")");
+        }
+        if (config.getLeaseDurationSeconds() <= 0) {
+            throw new IllegalStateException("leaseDurationSeconds must be > 0 (found: " + config.getLeaseDurationSeconds() + ")");
+        }
+        if (config.getLeaseDurationSeconds() <= config.getRenewDeadlineSeconds()) {
+            throw new IllegalStateException("leaseDurationSeconds must be greater than renewDeadlineSeconds "
+                    + "(" + config.getLeaseDurationSeconds() + " is not greater than " + config.getRenewDeadlineSeconds() + ")");
+        }
+        if (config.getRenewDeadlineSeconds() <= config.getJitterFactor() * config.getRetryPeriodSeconds()) {
+            throw new IllegalStateException("renewDeadlineSeconds must be greater than jitterFactor*retryPeriodSeconds "
+                    + "(" + config.getRenewDeadlineSeconds() + " is not greater than " + config.getJitterFactor() + "*" + config.getRetryPeriodSeconds() + ")");
+        }
+
         return config;
     }
 
@@ -137,15 +161,88 @@ public class KubernetesClusterService extends AbstractCamelClusterService<Kubern
         lockConfiguration.setClusterLabels(clusterLabels);
     }
 
-    public Long getWatchRefreshIntervalSeconds() {
-        return lockConfiguration.getWatchRefreshIntervalSeconds();
+    public void addToClusterLabels(String key, String value) {
+        lockConfiguration.addToClusterLabels(key, value);
+    }
+
+    public String getKubernetesResourcesNamespace() {
+        return lockConfiguration.getKubernetesResourcesNamespace();
+    }
+
+    /**
+     * Kubernetes namespace containing the pods and the ConfigMap used for locking.
+     */
+    public void setKubernetesResourcesNamespace(String kubernetesResourcesNamespace) {
+        lockConfiguration.setKubernetesResourcesNamespace(kubernetesResourcesNamespace);
+    }
+
+    public long getRetryOnErrorIntervalSeconds() {
+        return lockConfiguration.getRetryOnErrorIntervalSeconds();
     }
 
     /**
      * Indicates the maximum amount of time a Kubernetes watch should be kept active, before being recreated.
-     * Watch recreation can be disabled by putting a negative value (the default will be used in case of null).
+     * Watch recreation can be disabled by putting value <= 0.
+     */
+    public void setRetryOnErrorIntervalSeconds(long retryOnErrorIntervalSeconds) {
+        lockConfiguration.setRetryOnErrorIntervalSeconds(retryOnErrorIntervalSeconds);
+    }
+
+    public double getJitterFactor() {
+        return lockConfiguration.getJitterFactor();
+    }
+
+    /**
+     * A jitter factor to apply in order to prevent all pods to try to become leaders in the same instant.
      */
-    public void setWatchRefreshIntervalSeconds(Long watchRefreshIntervalSeconds) {
+    public void setJitterFactor(double jitterFactor) {
+        lockConfiguration.setJitterFactor(jitterFactor);
+    }
+
+    public long getLeaseDurationSeconds() {
+        return lockConfiguration.getLeaseDurationSeconds();
+    }
+
+    /**
+     * The default duration of the lease for the current leader.
+     */
+    public void setLeaseDurationSeconds(long leaseDurationSeconds) {
+        lockConfiguration.setLeaseDurationSeconds(leaseDurationSeconds);
+    }
+
+    public long getRenewDeadlineSeconds() {
+        return lockConfiguration.getRenewDeadlineSeconds();
+    }
+
+    /**
+     * The deadline after which the leader must stop trying to renew its leadership (and yield it).
+     */
+    public void setRenewDeadlineSeconds(long renewDeadlineSeconds) {
+        lockConfiguration.setRenewDeadlineSeconds(renewDeadlineSeconds);
+    }
+
+    public long getRetryPeriodSeconds() {
+        return lockConfiguration.getRetryPeriodSeconds();
+    }
+
+    /**
+     * The time between two subsequent attempts to acquire/renew the leadership (or after the lease expiration).
+     * It is randomized using the jitter factor in case of new leader election (not renewal).
+     */
+    public void setRetryPeriodSeconds(long retryPeriodSeconds) {
+        lockConfiguration.setRetryPeriodSeconds(retryPeriodSeconds);
+    }
+
+    public long getWatchRefreshIntervalSeconds() {
+        return lockConfiguration.getWatchRefreshIntervalSeconds();
+    }
+
+    /**
+     * Set this to a positive value in order to recreate watchers after a certain amount of time,
+     * to avoid having stale watchers.
+     */
+    public void setWatchRefreshIntervalSeconds(long watchRefreshIntervalSeconds) {
         lockConfiguration.setWatchRefreshIntervalSeconds(watchRefreshIntervalSeconds);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
index 9ac6a86..e324b3f 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
@@ -30,7 +30,7 @@ import io.fabric8.kubernetes.client.KubernetesClient;
 import org.apache.camel.component.kubernetes.KubernetesConfiguration;
 import org.apache.camel.component.kubernetes.KubernetesHelper;
 import org.apache.camel.component.kubernetes.ha.lock.KubernetesClusterEvent;
-import org.apache.camel.component.kubernetes.ha.lock.KubernetesLeadershipController;
+import org.apache.camel.component.kubernetes.ha.lock.KubernetesLeaseBasedLeadershipController;
 import org.apache.camel.component.kubernetes.ha.lock.KubernetesLockConfiguration;
 import org.apache.camel.ha.CamelClusterMember;
 import org.apache.camel.impl.ha.AbstractCamelClusterView;
@@ -56,7 +56,7 @@ public class KubernetesClusterView extends AbstractCamelClusterView {
 
     private volatile List<CamelClusterMember> currentMembers = Collections.emptyList();
 
-    private KubernetesLeadershipController controller;
+    private KubernetesLeaseBasedLeadershipController controller;
 
     public KubernetesClusterView(KubernetesClusterService cluster, KubernetesConfiguration configuration, KubernetesLockConfiguration lockConfiguration) {
         super(cluster, lockConfiguration.getGroupName());
@@ -86,7 +86,7 @@ public class KubernetesClusterView extends AbstractCamelClusterView {
         if (controller == null) {
             this.kubernetesClient = KubernetesHelper.getKubernetesClient(configuration);
 
-            controller = new KubernetesLeadershipController(kubernetesClient, this.lockConfiguration, event -> {
+            controller = new KubernetesLeaseBasedLeadershipController(kubernetesClient, this.lockConfiguration, event -> {
                 if (event instanceof KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) {
                     // New leader
                     Optional<String> leader = KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent.class.cast(event).getData();

http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java
new file mode 100644
index 0000000..84718f3
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.lock;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public final class ConfigMapLockUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ConfigMapLockUtils.class);
+
+    private static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ssX";
+
+    private static final String LEADER_PREFIX = "leader.pod.";
+
+    private static final String TIMESTAMP_PREFIX = "leader.timestamp.";
+
+    private ConfigMapLockUtils() {
+    }
+
+    public static ConfigMap createNewConfigMap(String configMapName, LeaderInfo leaderInfo) {
+        return new ConfigMapBuilder().
+                withNewMetadata()
+                    .withName(configMapName)
+                    .addToLabels("provider", "camel")
+                    .addToLabels("kind", "locks").
+                endMetadata()
+                .addToData(LEADER_PREFIX + leaderInfo.getGroupName(), leaderInfo.getLeader())
+                .addToData(TIMESTAMP_PREFIX + leaderInfo.getGroupName(), formatDate(leaderInfo.getTimestamp()))
+                .build();
+    }
+
+    public static ConfigMap getConfigMapWithNewLeader(ConfigMap configMap, LeaderInfo leaderInfo) {
+        return new ConfigMapBuilder(configMap)
+                .addToData(LEADER_PREFIX + leaderInfo.getGroupName(), leaderInfo.getLeader())
+                .addToData(TIMESTAMP_PREFIX + leaderInfo.getGroupName(), formatDate(leaderInfo.getTimestamp()))
+                .build();
+    }
+
+    public static LeaderInfo getLeaderInfo(ConfigMap configMap, String group) {
+        return new LeaderInfo(group, getLeader(configMap, group), getTimestamp(configMap, group));
+    }
+
+    private static String getLeader(ConfigMap configMap, String group) {
+        return getConfigMapValue(configMap, LEADER_PREFIX + group);
+    }
+
+    private static String formatDate(Date date) {
+        if (date == null) {
+            return null;
+        }
+        try {
+            return new SimpleDateFormat(DATE_TIME_FORMAT).format(date);
+        } catch (Exception e) {
+            LOG.warn("Unable to format date '" + date + "' using format " + DATE_TIME_FORMAT, e);
+        }
+
+        return null;
+    }
+
+    private static Date getTimestamp(ConfigMap configMap, String group) {
+        String timestamp = getConfigMapValue(configMap, TIMESTAMP_PREFIX + group);
+        if (timestamp == null) {
+            return null;
+        }
+
+        try {
+            return new SimpleDateFormat(DATE_TIME_FORMAT).parse(timestamp);
+        } catch (Exception e) {
+            LOG.warn("Unable to parse time string '" + timestamp + "' using format " + DATE_TIME_FORMAT, e);
+        }
+
+        return null;
+    }
+
+    private static String getConfigMapValue(ConfigMap configMap, String key) {
+        if (configMap == null || configMap.getData() == null) {
+            return null;
+        }
+        return configMap.getData().get(key);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaderMonitor.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaderMonitor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaderMonitor.java
deleted file mode 100644
index 5555fe1..0000000
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaderMonitor.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kubernetes.ha.lock;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import io.fabric8.kubernetes.api.model.ConfigMap;
-import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.KubernetesClientException;
-import io.fabric8.kubernetes.client.Watch;
-import io.fabric8.kubernetes.client.Watcher;
-
-import org.apache.camel.Service;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Monitors continuously the configmap to detect changes in leadership.
- * It calls the callback eventHandlers only when the leader changes w.r.t. the previous invocation.
- */
-class KubernetesLeaderMonitor implements Service {
-
-    private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeaderMonitor.class);
-
-    private ScheduledExecutorService serializedExecutor;
-
-    private KubernetesClient kubernetesClient;
-
-    private KubernetesLockConfiguration lockConfiguration;
-
-    private List<KubernetesClusterEventHandler> eventHandlers;
-
-    private Watch watch;
-
-    private boolean terminated;
-
-    private boolean refreshing;
-
-    private ConfigMap latestConfigMap;
-
-    public KubernetesLeaderMonitor(ScheduledExecutorService serializedExecutor, KubernetesClient kubernetesClient, KubernetesLockConfiguration lockConfiguration) {
-        this.serializedExecutor = serializedExecutor;
-        this.kubernetesClient = kubernetesClient;
-        this.lockConfiguration = lockConfiguration;
-        this.eventHandlers = new LinkedList<>();
-    }
-
-    public void addClusterEventHandler(KubernetesClusterEventHandler leaderEventHandler) {
-        this.eventHandlers.add(leaderEventHandler);
-    }
-
-    @Override
-    public void start() throws Exception {
-        this.terminated = false;
-        serializedExecutor.execute(this::startWatch);
-        serializedExecutor.execute(() -> doPoll(true));
-
-        long recreationDelay = lockConfiguration.getWatchRefreshIntervalSecondsOrDefault();
-        if (recreationDelay > 0) {
-            serializedExecutor.scheduleWithFixedDelay(this::refresh, recreationDelay, recreationDelay, TimeUnit.SECONDS);
-        }
-    }
-
-    @Override
-    public void stop() throws Exception {
-        this.terminated = true;
-        Watch watch = this.watch;
-        if (watch != null) {
-            watch.close();
-        }
-    }
-
-    public void refresh() {
-        serializedExecutor.execute(() -> {
-            if (!terminated) {
-                refreshing = true;
-                try {
-                    doPoll(false);
-
-                    Watch w = this.watch;
-                    if (w != null) {
-                        // It will be recreated
-                        w.close();
-                    }
-                } finally {
-                    refreshing = false;
-                }
-            }
-        });
-    }
-
-    private void startWatch() {
-        try {
-            LOG.debug("Starting ConfigMap watcher for monitoring the current leader");
-            this.watch = kubernetesClient.configMaps()
-                    .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
-                    .withName(this.lockConfiguration.getConfigMapName())
-                    .watch(new Watcher<ConfigMap>() {
-
-                        @Override
-                        public void eventReceived(Action action, ConfigMap configMap) {
-                            switch (action) {
-                            case MODIFIED:
-                            case DELETED:
-                            case ADDED:
-                                LOG.debug("Received update from watch on ConfigMap {}", configMap);
-                                serializedExecutor.execute(() -> checkAndNotify(configMap));
-                                break;
-                            default:
-                            }
-                        }
-
-                        @Override
-                        public void onClose(KubernetesClientException e) {
-                            if (!terminated) {
-                                KubernetesLeaderMonitor.this.watch = null;
-                                if (refreshing) {
-                                    LOG.info("Refreshing ConfigMap watcher...");
-                                    serializedExecutor.execute(KubernetesLeaderMonitor.this::startWatch);
-                                } else {
-                                    LOG.warn("ConfigMap watcher has been closed unexpectedly. Recreating it in 1 second...", e);
-                                    serializedExecutor.schedule(KubernetesLeaderMonitor.this::startWatch, 1, TimeUnit.SECONDS);
-                                }
-                            }
-                        }
-                    });
-        } catch (Exception ex) {
-            LOG.warn("Unable to watch for configmap changes. Retrying in 5 seconds...");
-            LOG.debug("Error while trying to watch the configmap", ex);
-
-            this.serializedExecutor.schedule(this::startWatch, 5, TimeUnit.SECONDS);
-        }
-    }
-
-    private void doPoll(boolean retry) {
-        LOG.debug("Starting poll to get configmap {}", this.lockConfiguration.getConfigMapName());
-        ConfigMap configMap;
-        try {
-            configMap = pollConfigMap();
-        } catch (Exception ex) {
-            if (retry) {
-                LOG.warn("ConfigMap poll failed. Retrying in 5 seconds...", ex);
-                this.serializedExecutor.schedule(() -> doPoll(true), 5, TimeUnit.SECONDS);
-            } else {
-                LOG.warn("ConfigMap poll failed", ex);
-            }
-            return;
-        }
-
-        checkAndNotify(configMap);
-    }
-
-    private void checkAndNotify(ConfigMap candidateConfigMap) {
-        LOG.debug("Checking configMap {}", candidateConfigMap);
-        ConfigMap newConfigMap = newest(this.latestConfigMap, candidateConfigMap);
-        Optional<String> leader = extractLeader(newConfigMap);
-        Optional<String> oldLeader = extractLeader(this.latestConfigMap);
-
-        this.latestConfigMap = newConfigMap;
-
-        LOG.debug("The new leader is {}. Old leader was {}", leader, oldLeader);
-        if (!leader.equals(oldLeader)) {
-            LOG.debug("Notifying the new leader to all eventHandlers");
-            for (KubernetesClusterEventHandler eventHandler : eventHandlers) {
-                eventHandler.onKubernetesClusterEvent((KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) () -> leader);
-            }
-        } else {
-            LOG.debug("Leader has not changed");
-        }
-    }
-
-    private ConfigMap pollConfigMap() {
-        return kubernetesClient.configMaps()
-                .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
-                .withName(this.lockConfiguration.getConfigMapName())
-                .get();
-    }
-
-    private Optional<String> extractLeader(ConfigMap configMap) {
-        Optional<String> leader = Optional.empty();
-        if (configMap != null && configMap.getData() != null) {
-            leader = Optional.ofNullable(configMap.getData().get(this.lockConfiguration.getGroupName()));
-        }
-        return leader;
-    }
-
-    private ConfigMap newest(ConfigMap configMap1, ConfigMap configMap2) {
-        ConfigMap newest = null;
-
-        if (configMap1 != null && configMap2 == null) {
-            newest = configMap1;
-        } else if (configMap1 == null && configMap2 != null) {
-            newest = configMap2;
-        }
-
-        if (newest == null) {
-            String rv1 = extractResourceVersion(configMap1);
-            String rv2 = extractResourceVersion(configMap2);
-            newest = newest(configMap1, configMap2, rv1, rv2);
-        }
-
-        if (newest == null) {
-            String ct1 = extractCreationTimestamp(configMap1);
-            String ct2 = extractCreationTimestamp(configMap2);
-            // timestamps are string-comparable
-            newest = newest(configMap1, configMap2, ct1, ct2);
-        }
-
-        return newest;
-    }
-
-    private <T extends Comparable<T>> ConfigMap newest(ConfigMap configMap1, ConfigMap configMap2, T cmp1, T cmp2) {
-        if (cmp1 != null && cmp2 != null) {
-            int comp = cmp1.compareTo(cmp2);
-            if (comp > 0) {
-                return configMap1;
-            } else {
-                return configMap2;
-            }
-        }
-        return null;
-    }
-
-    private String extractResourceVersion(ConfigMap configMap) {
-        if (configMap != null && configMap.getMetadata() != null) {
-            return configMap.getMetadata().getResourceVersion();
-        }
-        return null;
-    }
-
-    private String extractCreationTimestamp(ConfigMap configMap) {
-        if (configMap != null && configMap.getMetadata() != null) {
-            return configMap.getMetadata().getCreationTimestamp();
-        }
-        return null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java
deleted file mode 100644
index ad2f9bc..0000000
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kubernetes.ha.lock;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import io.fabric8.kubernetes.api.model.ConfigMap;
-import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
-import io.fabric8.kubernetes.client.KubernetesClient;
-
-import org.apache.camel.Service;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Start the monitors and participate to leader election when no active leaders are present.
- * It communicates changes in leadership and cluster members to the given event handler.
- */
-public class KubernetesLeadershipController implements Service {
-
-    private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeadershipController.class);
-
-    private KubernetesClient kubernetesClient;
-
-    private KubernetesLockConfiguration lockConfiguration;
-
-    private ScheduledExecutorService executor;
-
-    private KubernetesLeaderMonitor leaderMonitor;
-
-    private KubernetesMembersMonitor membersMonitor;
-
-    private Optional<String> currentLeader;
-
-    private Set<String> currentMembers;
-
-    private KubernetesClusterEventHandler eventHandler;
-
-    public KubernetesLeadershipController(KubernetesClient kubernetesClient, KubernetesLockConfiguration lockConfiguration, KubernetesClusterEventHandler eventHandler) {
-
-        this.kubernetesClient = kubernetesClient;
-        this.lockConfiguration = lockConfiguration;
-        this.eventHandler = eventHandler;
-
-        this.currentLeader = Optional.empty();
-        this.currentMembers = Collections.emptySet();
-    }
-
-    @Override
-    public void start() throws Exception {
-
-        if (executor == null) {
-            executor = Executors.newSingleThreadScheduledExecutor(); // No concurrency
-            leaderMonitor = new KubernetesLeaderMonitor(this.executor, this.kubernetesClient, this.lockConfiguration);
-            membersMonitor = new KubernetesMembersMonitor(this.executor, this.kubernetesClient, this.lockConfiguration);
-
-            leaderMonitor.addClusterEventHandler(e -> executor.execute(() -> onLeaderChanged(e)));
-            if (eventHandler != null) {
-                leaderMonitor.addClusterEventHandler(eventHandler);
-            }
-
-            membersMonitor.addClusterEventHandler(e -> executor.execute(() -> onMembersChanged(e)));
-            if (eventHandler != null) {
-                membersMonitor.addClusterEventHandler(eventHandler);
-            }
-
-            // Start all services
-            leaderMonitor.start();
-            membersMonitor.start();
-
-            // Fire a new election if possible
-            executor.execute(this::runLeaderElection);
-        }
-
-    }
-
-    @Override
-    public void stop() throws Exception {
-        if (executor != null) {
-            membersMonitor.stop();
-            leaderMonitor.stop();
-            executor.shutdown();
-            executor.shutdownNow();
-
-            membersMonitor = null;
-            leaderMonitor = null;
-            executor = null;
-        }
-    }
-
-    private void onLeaderChanged(KubernetesClusterEvent e) {
-        Optional<String> newLeader = KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent.class.cast(e).getData();
-        if (!newLeader.isPresent()) {
-            executor.execute(this::tryLeaderElection);
-        }
-        this.currentLeader = newLeader;
-    }
-
-    private void onMembersChanged(KubernetesClusterEvent e) {
-        Set<String> newMembers = KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent.class.cast(e).getData();
-        if (currentLeader.isPresent()) {
-            // Check if the current leader is still present in the list
-            if (!newMembers.contains(currentLeader.get()) && currentMembers.contains(currentLeader.get())) {
-                executor.execute(this::runLeaderElection);
-            }
-        }
-        this.currentMembers = newMembers;
-    }
-
-    private void runLeaderElection() {
-        boolean finished = false;
-        try {
-            finished = tryLeaderElection();
-        } catch (Exception ex) {
-            LOG.warn("Exception while trying to acquire the leadership", ex);
-        }
-
-        if (!finished) {
-            executor.schedule(this::runLeaderElection, 1, TimeUnit.SECONDS);
-        }
-    }
-
-    private boolean tryLeaderElection() {
-        LOG.debug("Starting leader election");
-        if (!currentMembers.contains(this.lockConfiguration.getPodName())) {
-            LOG.debug("The current pod ({}) is not in the list of participating pods {}. Cannot participate to the election", this.lockConfiguration.getPodName(), currentMembers);
-            return false;
-        }
-
-        ConfigMap configMap = kubernetesClient.configMaps()
-                .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
-                .withName(this.lockConfiguration.getConfigMapName())
-                .get();
-
-        if (configMap == null) {
-            // No configmap created so far
-            LOG.info("Lock configmap is not present in the Kubernetes namespace. A new ConfigMap will be created");
-
-            ConfigMap newConfigMap = new ConfigMapBuilder().
-                    withNewMetadata()
-                    .withName(this.lockConfiguration.getConfigMapName())
-                    .addToLabels("provider", "camel")
-                    .addToLabels("kind", "locks").
-                            endMetadata()
-                    .addToData(this.lockConfiguration.getGroupName(), this.lockConfiguration.getPodName())
-                    .build();
-
-            try {
-                kubernetesClient.configMaps()
-                        .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
-                        .create(newConfigMap);
-            } catch (Exception ex) {
-                // Suppress exception
-                LOG.warn("Unable to create the ConfigMap, it may have been created by other cluster members concurrently. If the problem persists, check if the service account has the right "
-                        + "permissions to create it");
-                LOG.debug("Exception while trying to create the ConfigMap", ex);
-                return false;
-            }
-            return true;
-        } else {
-            LOG.info("Lock configmap already present in the Kubernetes namespace. Checking...");
-            Map<String, String> leaders = configMap.getData();
-            Optional<String> oldLeader = leaders != null ? Optional.ofNullable(leaders.get(this.lockConfiguration.getGroupName())) : Optional.empty();
-
-            boolean noLeaderPresent = !oldLeader.isPresent() || !currentMembers.contains(oldLeader.get());
-            boolean alreadyLeader = oldLeader.isPresent() && oldLeader.get().equals(this.lockConfiguration.getPodName());
-
-            if (noLeaderPresent && !alreadyLeader) {
-                LOG.info("Trying to acquire the lock in configmap={}, key={}", this.lockConfiguration.getConfigMapName(), this.lockConfiguration.getGroupName());
-                ConfigMap newConfigMap = new ConfigMapBuilder(configMap)
-                        .addToData(this.lockConfiguration.getGroupName(), this.lockConfiguration.getPodName())
-                        .build();
-
-                kubernetesClient.configMaps()
-                        .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
-                        .withName(this.lockConfiguration.getConfigMapName())
-                        .lockResourceVersion(configMap.getMetadata().getResourceVersion())
-                        .replace(newConfigMap);
-
-                LOG.info("Lock acquired for configmap={}, key={}", this.lockConfiguration.getConfigMapName(), this.lockConfiguration.getGroupName());
-            } else if (!noLeaderPresent) {
-                LOG.info("A leader is already present for configmap={}, key={}: {}", this.lockConfiguration.getConfigMapName(), this.lockConfiguration.getGroupName(), oldLeader);
-            } else {
-                LOG.info("This pod ({}) is already the leader for configmap={}, key={}", this.lockConfiguration.getPodName(), this.lockConfiguration.getConfigMapName(), this.lockConfiguration
-                        .getGroupName());
-            }
-            return true;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java
new file mode 100644
index 0000000..b385925
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.lock;
+
+import java.util.Date;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import org.apache.camel.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Monitors current status and participate to leader election when no active leaders are present.
+ * It communicates changes in leadership and cluster members to the given event handler.
+ */
+public class KubernetesLeaseBasedLeadershipController implements Service {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeaseBasedLeadershipController.class);
+
+    private static final long FIXED_ADDITIONAL_DELAY = 100;
+
+    private KubernetesClient kubernetesClient;
+
+    private KubernetesLockConfiguration lockConfiguration;
+
+    private KubernetesClusterEventHandler eventHandler;
+
+    private ScheduledExecutorService serializedExecutor;
+    private ScheduledExecutorService eventDispatcherExecutor;
+
+    private KubernetesMembersMonitor membersMonitor;
+
+    private Optional<String> currentLeader = Optional.empty();
+
+    private volatile LeaderInfo latestLeaderInfo;
+
+    public KubernetesLeaseBasedLeadershipController(KubernetesClient kubernetesClient, KubernetesLockConfiguration lockConfiguration, KubernetesClusterEventHandler eventHandler) {
+        this.kubernetesClient = kubernetesClient;
+        this.lockConfiguration = lockConfiguration;
+        this.eventHandler = eventHandler;
+    }
+
+    @Override
+    public void start() throws Exception {
+        if (serializedExecutor == null) {
+            LOG.debug("Starting leadership controller...");
+            serializedExecutor = Executors.newSingleThreadScheduledExecutor();
+
+            eventDispatcherExecutor = Executors.newSingleThreadScheduledExecutor();
+
+            membersMonitor = new KubernetesMembersMonitor(this.serializedExecutor, this.kubernetesClient, this.lockConfiguration);
+            if (eventHandler != null) {
+                membersMonitor.addClusterEventHandler(eventHandler);
+            }
+
+            membersMonitor.start();
+            serializedExecutor.execute(this::initialization);
+        }
+    }
+
+    @Override
+    public void stop() throws Exception {
+        LOG.debug("Stopping leadership controller...");
+        if (serializedExecutor != null) {
+            serializedExecutor.shutdownNow();
+        }
+        if (eventDispatcherExecutor != null) {
+            eventDispatcherExecutor.shutdown();
+            eventDispatcherExecutor.awaitTermination(2, TimeUnit.SECONDS);
+            eventDispatcherExecutor.shutdownNow();
+        }
+        if (membersMonitor != null) {
+            membersMonitor.stop();
+        }
+
+        membersMonitor = null;
+        eventDispatcherExecutor = null;
+        serializedExecutor = null;
+    }
+
+    /**
+     * Get the first ConfigMap and setup the initial state.
+     */
+    private void initialization() {
+        LOG.debug("Reading (with retry) the configmap {} to detect the current leader", this.lockConfiguration.getConfigMapName());
+        refreshConfigMapFromCluster(true);
+
+        if (isCurrentPodTheActiveLeader()) {
+            serializedExecutor.execute(this::onLeadershipAcquired);
+        } else {
+            LOG.info("The current pod ({}) is not the leader of the group '{}' in ConfigMap '{}' at this time", this.lockConfiguration.getPodName(), this.lockConfiguration
+                    .getGroupName(), this.lockConfiguration.getConfigMapName());
+            serializedExecutor.execute(this::acquireLeadershipCycle);
+        }
+    }
+
+    /**
+     * Signals the acquisition of the leadership and move to the keep-leadership state.
+     */
+    private void onLeadershipAcquired() {
+        LOG.info("The current pod ({}) is now the leader of the group '{}' in ConfigMap '{}'", this.lockConfiguration.getPodName(), this.lockConfiguration
+                .getGroupName(), this.lockConfiguration.getConfigMapName());
+
+        this.eventDispatcherExecutor.execute(this::checkAndNotifyNewLeader);
+
+        long nextDelay = computeNextRenewWaitTime(this.latestLeaderInfo.getTimestamp(), this.latestLeaderInfo.getTimestamp());
+        serializedExecutor.schedule(this::keepLeadershipCycle, nextDelay + FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * While in the keep-leadership state, the controller periodically renews the lease.
+     * If a renewal deadline is met and it was not possible to renew the lease, the leadership is lost.
+     */
+    private void keepLeadershipCycle() {
+        // renew lease periodically
+        refreshConfigMapFromCluster(false); // if possible, update
+
+        if (this.latestLeaderInfo.isTimeElapsedSeconds(lockConfiguration.getRenewDeadlineSeconds()) || !this.latestLeaderInfo.isLeader(this.lockConfiguration.getPodName())) {
+            // Time over for renewal or leadership lost
+            LOG.debug("The current pod ({}) has lost the leadership", this.lockConfiguration.getPodName());
+            serializedExecutor.execute(this::onLeadershipLost);
+            return;
+        }
+
+        boolean success = tryAcquireOrRenewLeadership();
+        LOG.debug("Attempted to renew the lease. Success={}", success);
+
+        long nextDelay = computeNextRenewWaitTime(this.latestLeaderInfo.getTimestamp(), new Date());
+        serializedExecutor.schedule(this::keepLeadershipCycle, nextDelay + FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Compute the timestamp of next event while in keep-leadership state.
+     */
+    private long computeNextRenewWaitTime(Date lastRenewal, Date lastRenewalAttempt) {
+        long timeDeadline = lastRenewal.getTime() + this.lockConfiguration.getRenewDeadlineSeconds() * 1000;
+        long timeRetry;
+        long counter = 0;
+        do {
+            counter++;
+            timeRetry = lastRenewal.getTime() + counter * this.lockConfiguration.getRetryPeriodSeconds() * 1000;
+        } while (timeRetry < lastRenewalAttempt.getTime() && timeRetry < timeDeadline);
+
+        long time = Math.min(timeRetry, timeDeadline);
+        long delay = Math.max(0, time - System.currentTimeMillis());
+        LOG.debug("Next renewal timeout event will be fired in {} seconds", delay / 1000);
+        return delay;
+    }
+
+
+    /**
+     * Signals the loss of leadership and move to the acquire-leadership state.
+     */
+    private void onLeadershipLost() {
+        LOG.info("The local pod ({}) is no longer the leader of the group '{}' in ConfigMap '{}'", this.lockConfiguration.getPodName(), this.lockConfiguration.getGroupName(),
+                this.lockConfiguration.getConfigMapName());
+
+        this.eventDispatcherExecutor.execute(this::checkAndNotifyNewLeader);
+        serializedExecutor.execute(this::acquireLeadershipCycle);
+    }
+
+    /**
+     * While in the acquire-leadership state, the controller waits for the current lease to expire before trying to acquire the leadership.
+     */
+    private void acquireLeadershipCycle() {
+        // wait for the current lease to finish then fire an election
+        refreshConfigMapFromCluster(false); // if possible, update
+
+        // Notify about changes in current leader if any
+        this.eventDispatcherExecutor.execute(this::checkAndNotifyNewLeader);
+
+        if (!this.latestLeaderInfo.isTimeElapsedSeconds(lockConfiguration.getLeaseDurationSeconds())) {
+            // Wait for the lease to finish before trying leader election
+            long nextDelay = computeNextElectionWaitTime(this.latestLeaderInfo.getTimestamp());
+            serializedExecutor.schedule(this::acquireLeadershipCycle, nextDelay + FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS);
+            return;
+        }
+
+        boolean acquired = tryAcquireOrRenewLeadership();
+        if (acquired) {
+            LOG.debug("Leadership acquired for ConfigMap {}. Notification in progress...", this.lockConfiguration.getConfigMapName());
+            serializedExecutor.execute(this::onLeadershipAcquired);
+            return;
+        }
+
+        // Notify about changes in current leader if any
+        this.eventDispatcherExecutor.execute(this::checkAndNotifyNewLeader);
+
+        LOG.debug("Cannot acquire the leadership for ConfigMap {}", this.lockConfiguration.getConfigMapName());
+        long nextDelay = computeNextElectionWaitTime(this.latestLeaderInfo.getTimestamp());
+        serializedExecutor.schedule(this::acquireLeadershipCycle, nextDelay + FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS);
+    }
+
+    private long computeNextElectionWaitTime(Date lastRenewal) {
+        if (lastRenewal == null) {
+            LOG.debug("Error detected while getting leadership info, next election timeout event will be fired in {} seconds", this.lockConfiguration.getRetryOnErrorIntervalSeconds());
+            return this.lockConfiguration.getRetryOnErrorIntervalSeconds();
+        }
+        long time = lastRenewal.getTime() + this.lockConfiguration.getLeaseDurationSeconds() * 1000
+                + jitter(this.lockConfiguration.getRetryPeriodSeconds() * 1000, this.lockConfiguration.getJitterFactor());
+
+        long delay = Math.max(0, time - System.currentTimeMillis());
+        LOG.debug("Next election timeout event will be fired in {} seconds", delay / 1000);
+        return delay;
+    }
+
+    private long jitter(long num, double factor) {
+        return (long) (num * (1 + Math.random() * (factor - 1)));
+    }
+
+    private boolean tryAcquireOrRenewLeadership() {
+        LOG.debug("Trying to acquire or renew the leadership...");
+
+        ConfigMap configMap;
+        try {
+            configMap = pullConfigMap();
+        } catch (Exception e) {
+            LOG.warn("Unable to retrieve the current ConfigMap " + this.lockConfiguration.getConfigMapName() + " from Kubernetes", e);
+            return false;
+        }
+
+        // Info to set in the configmap to become leaders
+        LeaderInfo newLeaderInfo = new LeaderInfo(this.lockConfiguration.getGroupName(), this.lockConfiguration.getPodName(), new Date());
+
+        if (configMap == null) {
+            // No configmap created so far
+            LOG.debug("Lock configmap is not present in the Kubernetes namespace. A new ConfigMap will be created");
+            ConfigMap newConfigMap = ConfigMapLockUtils.createNewConfigMap(this.lockConfiguration.getConfigMapName(), newLeaderInfo);
+
+            try {
+                kubernetesClient.configMaps()
+                        .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
+                        .create(newConfigMap);
+            } catch (Exception ex) {
+                // Suppress exception
+                LOG.warn("Unable to create the ConfigMap, it may have been created by other cluster members concurrently. If the problem persists, check if the service account has the right "
+                        + "permissions to create it");
+                LOG.debug("Exception while trying to create the ConfigMap", ex);
+
+                // Try to get the configMap and return the current leader
+                refreshConfigMapFromCluster(false);
+                return isCurrentPodTheActiveLeader();
+            }
+
+            LOG.debug("ConfigMap {} successfully created and local pod is leader", this.lockConfiguration.getConfigMapName());
+            updateLatestLeaderInfo(newConfigMap);
+            return true;
+        } else {
+            LOG.debug("Lock configmap already present in the Kubernetes namespace. Checking...");
+            LeaderInfo leaderInfo = ConfigMapLockUtils.getLeaderInfo(configMap, this.lockConfiguration.getGroupName());
+
+            boolean weWereLeader = leaderInfo.isLeader(this.lockConfiguration.getPodName());
+            boolean leaseExpired = leaderInfo.isTimeElapsedSeconds(this.lockConfiguration.getLeaseDurationSeconds());
+
+            if (weWereLeader || leaseExpired) {
+                // Renew the lease or set the new leader
+                try {
+                    ConfigMap updatedConfigMap = ConfigMapLockUtils.getConfigMapWithNewLeader(configMap, newLeaderInfo);
+                    kubernetesClient.configMaps()
+                            .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
+                            .withName(this.lockConfiguration.getConfigMapName())
+                            .lockResourceVersion(configMap.getMetadata().getResourceVersion())
+                            .replace(updatedConfigMap);
+
+                    LOG.debug("ConfigMap {} successfully updated and local pod is leader", this.lockConfiguration.getConfigMapName());
+                    updateLatestLeaderInfo(updatedConfigMap);
+                    return true;
+                } catch (Exception ex) {
+                    LOG.warn("An attempt to become leader has failed. It's possible that the leadership has been taken by another pod");
+                    LOG.debug("Error received during configmap lock replace", ex);
+
+                    // Try to get the configMap and return the current leader
+                    refreshConfigMapFromCluster(false);
+                    return isCurrentPodTheActiveLeader();
+                }
+            } else {
+                // Another pod is the leader and lease is not expired
+                LOG.debug("Another pod is the current leader and lease has not expired yet");
+                updateLatestLeaderInfo(configMap);
+                return false;
+            }
+        }
+    }
+
+
+    private void refreshConfigMapFromCluster(boolean retry) {
+        LOG.debug("Retrieving configmap {}", this.lockConfiguration.getConfigMapName());
+        try {
+            updateLatestLeaderInfo(pullConfigMap());
+        } catch (Exception ex) {
+            if (retry) {
+                LOG.warn("ConfigMap pull failed. Retrying in " + this.lockConfiguration.getRetryOnErrorIntervalSeconds() + " seconds...", ex);
+                try {
+                    Thread.sleep(this.lockConfiguration.getRetryOnErrorIntervalSeconds() * 1000);
+                    refreshConfigMapFromCluster(true);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new RuntimeException("Controller Thread interrupted, shutdown in progress", e);
+                }
+            } else {
+                LOG.warn("Cannot retrieve the ConfigMap: pull failed", ex);
+            }
+        }
+    }
+
+    private boolean isCurrentPodTheActiveLeader() {
+        return latestLeaderInfo != null
+                && latestLeaderInfo.isLeader(this.lockConfiguration.getPodName())
+                && !latestLeaderInfo.isTimeElapsedSeconds(this.lockConfiguration.getRenewDeadlineSeconds());
+    }
+
+    private ConfigMap pullConfigMap() {
+        return kubernetesClient.configMaps()
+                .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
+                .withName(this.lockConfiguration.getConfigMapName())
+                .get();
+    }
+
+
+    private void updateLatestLeaderInfo(ConfigMap configMap) {
+        LOG.debug("Updating internal status about the current leader");
+        this.latestLeaderInfo = ConfigMapLockUtils.getLeaderInfo(configMap, this.lockConfiguration.getGroupName());
+    }
+
+    private void checkAndNotifyNewLeader() {
+        LOG.debug("Checking if the current leader has changed to notify the event handler...");
+        LeaderInfo newLeaderInfo = this.latestLeaderInfo;
+        if (newLeaderInfo == null) {
+            return;
+        }
+
+        long leaderInfoDurationSeconds = newLeaderInfo.isLeader(this.lockConfiguration.getPodName())
+                ? this.lockConfiguration.getRenewDeadlineSeconds()
+                : this.lockConfiguration.getLeaseDurationSeconds();
+
+        Optional<String> newLeader;
+        if (newLeaderInfo.getLeader() != null && !newLeaderInfo.isTimeElapsedSeconds(leaderInfoDurationSeconds)) {
+            newLeader = Optional.of(newLeaderInfo.getLeader());
+        } else {
+            newLeader = Optional.empty();
+        }
+
+        // Sending notifications in case of leader change
+        if (!newLeader.equals(this.currentLeader)) {
+            LOG.debug("Current leader has changed from {} to {}. Sending notifications...", this.currentLeader, newLeader);
+            this.currentLeader = newLeader;
+            eventHandler.onKubernetesClusterEvent((KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) () -> newLeader);
+        } else {
+            LOG.debug("Current leader unchanged: {}", this.currentLeader);
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
index f203c0a..37e0251 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
@@ -26,7 +26,16 @@ import io.fabric8.kubernetes.client.KubernetesClient;
  */
 public class KubernetesLockConfiguration implements Cloneable {
 
-    private static final long DEFAULT_WATCHER_REFRESH_INTERVAL_SECONDS = 1800;
+    public static final String DEFAULT_CONFIGMAP_NAME = "leaders";
+
+
+    public static final double DEFAULT_JITTER_FACTOR = 1.2;
+    public static final long DEFAULT_LEASE_DURATION_SECONDS = 20;
+    public static final long DEFAULT_RENEW_DEADLINE_SECONDS = 15;
+    public static final long DEFAULT_RETRY_PERIOD_SECONDS = 6;
+
+    public static final long DEFAULT_RETRY_ON_ERROR_INTERVAL_SECONDS = 5;
+    public static final long DEFAULT_WATCH_REFRESH_INTERVAL_SECONDS = 1800;
 
     /**
      * Kubernetes namespace containing the pods and the ConfigMap used for locking.
@@ -36,7 +45,7 @@ public class KubernetesLockConfiguration implements Cloneable {
     /**
      * Name of the ConfigMap used for locking.
      */
-    private String configMapName;
+    private String configMapName = DEFAULT_CONFIGMAP_NAME;
 
     /**
      * Name of the lock group (or namespace according to the Camel cluster convention) within the chosen ConfgMap.
@@ -55,9 +64,36 @@ public class KubernetesLockConfiguration implements Cloneable {
 
     /**
      * Indicates the maximum amount of time a Kubernetes watch should be kept active, before being recreated.
-     * Watch recreation can be disabled by putting a negative value (the default will be used in case of null).
+     * Watch recreation can be disabled by putting value <= 0.
+     */
+    private long retryOnErrorIntervalSeconds = DEFAULT_RETRY_ON_ERROR_INTERVAL_SECONDS;
+
+    /**
+     * A jitter factor to apply in order to prevent all pods to try to become leaders in the same instant.
+     */
+    private double jitterFactor = DEFAULT_JITTER_FACTOR;
+
+    /**
+     * The default duration of the lease for the current leader.
+     */
+    private long leaseDurationSeconds = DEFAULT_LEASE_DURATION_SECONDS;
+
+    /**
+     * The deadline after which the leader must stop trying to renew its leadership (and yield it).
+     */
+    private long renewDeadlineSeconds = DEFAULT_RENEW_DEADLINE_SECONDS;
+
+    /**
+     * The time between two subsequent attempts to acquire/renew the leadership (or after the lease expiration).
+     * It is randomized using the jitter factor in case of new leader election (not renewal).
      */
-    private Long watchRefreshIntervalSeconds;
+    private long retryPeriodSeconds = DEFAULT_RETRY_PERIOD_SECONDS;
+
+    /**
+     * Set this to a positive value in order to recreate watchers after a certain amount of time
+     * (to prevent them becoming stale).
+     */
+    private long watchRefreshIntervalSeconds = DEFAULT_WATCH_REFRESH_INTERVAL_SECONDS;
 
     public KubernetesLockConfiguration() {
     }
@@ -113,19 +149,51 @@ public class KubernetesLockConfiguration implements Cloneable {
         this.clusterLabels = clusterLabels;
     }
 
-    public Long getWatchRefreshIntervalSeconds() {
-        return watchRefreshIntervalSeconds;
+    public long getRetryOnErrorIntervalSeconds() {
+        return retryOnErrorIntervalSeconds;
     }
 
-    public long getWatchRefreshIntervalSecondsOrDefault() {
-        Long interval = watchRefreshIntervalSeconds;
-        if (interval == null) {
-            interval = DEFAULT_WATCHER_REFRESH_INTERVAL_SECONDS;
-        }
-        return interval;
+    public void setRetryOnErrorIntervalSeconds(long retryOnErrorIntervalSeconds) {
+        this.retryOnErrorIntervalSeconds = retryOnErrorIntervalSeconds;
+    }
+
+    public double getJitterFactor() {
+        return jitterFactor;
+    }
+
+    public void setJitterFactor(double jitterFactor) {
+        this.jitterFactor = jitterFactor;
+    }
+
+    public long getLeaseDurationSeconds() {
+        return leaseDurationSeconds;
+    }
+
+    public void setLeaseDurationSeconds(long leaseDurationSeconds) {
+        this.leaseDurationSeconds = leaseDurationSeconds;
+    }
+
+    public long getRenewDeadlineSeconds() {
+        return renewDeadlineSeconds;
+    }
+
+    public void setRenewDeadlineSeconds(long renewDeadlineSeconds) {
+        this.renewDeadlineSeconds = renewDeadlineSeconds;
+    }
+
+    public long getRetryPeriodSeconds() {
+        return retryPeriodSeconds;
+    }
+
+    public void setRetryPeriodSeconds(long retryPeriodSeconds) {
+        this.retryPeriodSeconds = retryPeriodSeconds;
+    }
+
+    public long getWatchRefreshIntervalSeconds() {
+        return watchRefreshIntervalSeconds;
     }
 
-    public void setWatchRefreshIntervalSeconds(Long watchRefreshIntervalSeconds) {
+    public void setWatchRefreshIntervalSeconds(long watchRefreshIntervalSeconds) {
         this.watchRefreshIntervalSeconds = watchRefreshIntervalSeconds;
     }
 
@@ -146,6 +214,11 @@ public class KubernetesLockConfiguration implements Cloneable {
         sb.append(", groupName='").append(groupName).append('\'');
         sb.append(", podName='").append(podName).append('\'');
         sb.append(", clusterLabels=").append(clusterLabels);
+        sb.append(", retryOnErrorIntervalSeconds=").append(retryOnErrorIntervalSeconds);
+        sb.append(", jitterFactor=").append(jitterFactor);
+        sb.append(", leaseDurationSeconds=").append(leaseDurationSeconds);
+        sb.append(", renewDeadlineSeconds=").append(renewDeadlineSeconds);
+        sb.append(", retryPeriodSeconds=").append(retryPeriodSeconds);
         sb.append(", watchRefreshIntervalSeconds=").append(watchRefreshIntervalSeconds);
         sb.append('}');
         return sb.toString();

http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java
index d9173b2..586a11f 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java
@@ -41,8 +41,6 @@ import org.slf4j.LoggerFactory;
  */
 class KubernetesMembersMonitor implements Service {
 
-    private static final long DEFAULT_WATCHER_REFRESH_INTERVAL_SECONDS = 1800;
-
     private static final Logger LOG = LoggerFactory.getLogger(KubernetesMembersMonitor.class);
 
     private ScheduledExecutorService serializedExecutor;
@@ -81,7 +79,7 @@ class KubernetesMembersMonitor implements Service {
         serializedExecutor.execute(() -> doPoll(true));
         serializedExecutor.execute(this::createWatch);
 
-        long recreationDelay = lockConfiguration.getWatchRefreshIntervalSecondsOrDefault();
+        long recreationDelay = lockConfiguration.getWatchRefreshIntervalSeconds();
         if (recreationDelay > 0) {
             serializedExecutor.scheduleWithFixedDelay(this::refresh, recreationDelay, recreationDelay, TimeUnit.SECONDS);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/LeaderInfo.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/LeaderInfo.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/LeaderInfo.java
new file mode 100644
index 0000000..50d1603
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/LeaderInfo.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.lock;
+
+import java.util.Date;
+
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * Overview of a leadership status.
+ */
+public class LeaderInfo {
+
+    private String groupName;
+
+    private String leader;
+
+    private Date timestamp;
+
+    public LeaderInfo() {
+    }
+
+    public LeaderInfo(String groupName, String leader, Date timestamp) {
+        this.groupName = groupName;
+        this.leader = leader;
+        this.timestamp = timestamp;
+    }
+
+    public boolean isTimeElapsedSeconds(long timeSeconds) {
+        if (timestamp == null) {
+            return true;
+        }
+        long now = System.currentTimeMillis();
+        return timestamp.getTime() + timeSeconds * 1000 <= now;
+    }
+
+    public boolean isLeader(String pod) {
+        ObjectHelper.notNull(pod, "pod");
+        return pod.equals(leader);
+    }
+
+    public String getGroupName() {
+        return groupName;
+    }
+
+    public void setGroupName(String groupName) {
+        this.groupName = groupName;
+    }
+
+    public String getLeader() {
+        return leader;
+    }
+
+    public void setLeader(String leader) {
+        this.leader = leader;
+    }
+
+    public Date getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(Date timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder("LeaderInfo{");
+        sb.append("groupName='").append(groupName).append('\'');
+        sb.append(", leader='").append(leader).append('\'');
+        sb.append(", timestamp=").append(timestamp);
+        sb.append('}');
+        return sb.toString();
+    }
+
+}


Mime
View raw message