camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lburgazz...@apache.org
Subject [2/2] camel git commit: CAMEL-9888: Create a camel-consul component
Date Mon, 23 May 2016 15:22:42 GMT
CAMEL-9888: Create a camel-consul component


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/38d5374a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/38d5374a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/38d5374a

Branch: refs/heads/master
Commit: 38d5374aaf945ba587d7fcc06de8bac2ef4e0a48
Parents: dba22f9
Author: lburgazzoli <lburgazzoli@gmail.com>
Authored: Mon May 23 17:15:34 2016 +0200
Committer: lburgazzoli <lburgazzoli@gmail.com>
Committed: Mon May 23 17:22:25 2016 +0200

----------------------------------------------------------------------
 apache-camel/pom.xml                            |   4 +
 .../src/main/descriptors/common-bin.xml         |   1 +
 components/camel-consul/pom.xml                 | 120 ++++++++
 .../consul/AbstractConsulConsumer.java          | 110 +++++++
 .../consul/AbstractConsulEndpoint.java          |  94 ++++++
 .../consul/AbstractConsulProducer.java          | 129 ++++++++
 .../camel/component/consul/ConsulComponent.java |  65 ++++
 .../component/consul/ConsulConfiguration.java   | 241 +++++++++++++++
 .../camel/component/consul/ConsulConstants.java |  42 +++
 .../component/consul/ConsulEndpointFactory.java |  25 ++
 .../consul/enpoint/ConsulAgentActions.java      |  25 ++
 .../consul/enpoint/ConsulAgentEndpoint.java     |  43 +++
 .../consul/enpoint/ConsulAgentProducer.java     |  33 ++
 .../consul/enpoint/ConsulEventActions.java      |  23 ++
 .../consul/enpoint/ConsulEventConsumer.java     | 131 ++++++++
 .../consul/enpoint/ConsulEventEndpoint.java     |  44 +++
 .../consul/enpoint/ConsulEventProducer.java     |  56 ++++
 .../consul/enpoint/ConsulKeyValueActions.java   |  30 ++
 .../consul/enpoint/ConsulKeyValueConsumer.java  | 128 ++++++++
 .../consul/enpoint/ConsulKeyValueEndpoint.java  |  43 +++
 .../consul/enpoint/ConsulKeyValueProducer.java  | 126 ++++++++
 .../consul/policy/ConsulRoutePolicy.java        | 308 +++++++++++++++++++
 .../src/main/resources/META-INF/LICENSE.txt     | 203 ++++++++++++
 .../src/main/resources/META-INF/NOTICE.txt      |  11 +
 .../services/org/apache/camel/component/consul  |   1 +
 .../camel/component/consul/ConsulEventTest.java |  68 ++++
 .../component/consul/ConsulEventWatchTest.java  |  60 ++++
 .../component/consul/ConsulKeyValueTest.java    |  62 ++++
 .../consul/ConsulKeyValueWatchTest.java         |  66 ++++
 .../component/consul/ConsulTestSupport.java     |  64 ++++
 .../src/test/resources/log4j.properties         |  20 ++
 components/pom.xml                              |   1 +
 parent/pom.xml                                  |   7 +
 .../src/main/resources/bundles.properties       |   1 +
 .../features/src/main/resources/features.xml    |   6 +
 35 files changed, 2391 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/apache-camel/pom.xml
----------------------------------------------------------------------
diff --git a/apache-camel/pom.xml b/apache-camel/pom.xml
index 8f8c2f7..a556261 100644
--- a/apache-camel/pom.xml
+++ b/apache-camel/pom.xml
@@ -165,6 +165,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.camel</groupId>
+      <artifactId>camel-consul</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
       <artifactId>camel-context</artifactId>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/apache-camel/src/main/descriptors/common-bin.xml
----------------------------------------------------------------------
diff --git a/apache-camel/src/main/descriptors/common-bin.xml b/apache-camel/src/main/descriptors/common-bin.xml
index 3587254..5745702 100644
--- a/apache-camel/src/main/descriptors/common-bin.xml
+++ b/apache-camel/src/main/descriptors/common-bin.xml
@@ -54,6 +54,7 @@
         <include>org.apache.camel:camel-core</include>
         <include>org.apache.camel:camel-core-osgi</include>
         <include>org.apache.camel:camel-cometd</include>
+        <include>org.apache.camel:camel-consul</include>
         <include>org.apache.camel:camel-context</include>
         <include>org.apache.camel:camel-couchdb</include>
         <include>org.apache.camel:camel-crypto</include>

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-consul/pom.xml b/components/camel-consul/pom.xml
new file mode 100644
index 0000000..9b9dca9
--- /dev/null
+++ b/components/camel-consul/pom.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>components</artifactId>
+    <version>2.18-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>camel-consul</artifactId>
+  <packaging>jar</packaging>
+  <name>Camel :: Consul</name>
+  <description>Camel Consul support</description>
+
+  <properties>
+    <camel.osgi.export.pkg>
+      org.apache.camel.component.consul.*,
+    </camel.osgi.export.pkg>
+    <camel.osgi.export.service>
+      org.apache.camel.spi.ComponentResolver;component=consul
+    </camel.osgi.export.service>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.orbitz.consul</groupId>
+      <artifactId>consul-client</artifactId>
+      <version>${consul-client-version}</version>
+    </dependency>
+
+    <!-- testing -->
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>${mockito-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>java-hamcrest</artifactId>
+      <version>${hamcrest-version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+
+  <profiles>
+    <profile>
+      <id>consul-skip-tests</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <skipTests>true</skipTests>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>consul-tests</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <skipTests>false</skipTests>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+
+  </profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulConsumer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulConsumer.java
new file mode 100644
index 0000000..c3b6545
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulConsumer.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import java.math.BigInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import com.orbitz.consul.Consul;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * @author lburgazzoli
+ */
+public abstract class AbstractConsulConsumer<C> extends DefaultConsumer {
+    protected final AbstractConsulEndpoint endpoint;
+    protected final ConsulConfiguration configuration;
+    protected final String key;
+    protected final AtomicReference<BigInteger> index;
+
+    private final Function<Consul, C> clientSupplier;
+    private Runnable watcher;
+
+    protected AbstractConsulConsumer(AbstractConsulEndpoint endpoint, ConsulConfiguration configuration, Processor processor, Function<Consul, C> clientSupplier) {
+        super(endpoint, processor);
+
+        this.endpoint = endpoint;
+        this.configuration = configuration;
+        this.key = ObjectHelper.notNull(configuration.getKey(), ConsulConstants.CONSUL_KEY);
+        this.index = new AtomicReference<>(BigInteger.valueOf(configuration.getFirstIndex()));
+        this.clientSupplier = clientSupplier;
+        this.watcher = null;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        watcher = createWatcher(clientSupplier.apply(endpoint.getConsul()));
+        watcher.run();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        watcher = null;
+
+        super.doStop();
+    }
+
+    // *************************************************************************
+    //
+    // *************************************************************************
+
+    protected abstract Runnable createWatcher(C client) throws Exception;
+
+    // *************************************************************************
+    // Handlers
+    // *************************************************************************
+
+    protected abstract class AbstractWatcher implements Runnable {
+        private final C client;
+
+        public AbstractWatcher(C client) {
+            this.client = client;
+        }
+
+        protected void onError(Throwable throwable) {
+            if (isRunAllowed()) {
+                getExceptionHandler().handleException("Error watching for event " + key, throwable);
+            }
+        }
+
+        protected final void setIndex(BigInteger responseIndex) {
+            index.set(responseIndex);
+        }
+
+        @Override
+        public final void run() {
+            if (isRunAllowed()) {
+                watch(client);
+            }
+        }
+
+        protected final C client() {
+            return client;
+        }
+
+        protected final void watch() {
+            watch(client);
+        }
+
+        protected abstract void watch(C client);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulEndpoint.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulEndpoint.java
new file mode 100644
index 0000000..981c314
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulEndpoint.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import com.orbitz.consul.Consul;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.util.ObjectHelper;
+
+public abstract class AbstractConsulEndpoint extends DefaultEndpoint {
+
+    @UriPath(description = "The consul configuration")
+    @Metadata(required = "true")
+    private final ConsulConfiguration configuration;
+
+    @UriParam(description = "The API endpoint")
+    @Metadata(required = "true")
+    private final String apiEndpoint;
+
+    private Consul consul;
+
+    protected AbstractConsulEndpoint(String apiEndpoint, String uri, ConsulComponent component, ConsulConfiguration configuration) {
+        super(uri, component);
+
+        this.configuration = configuration;
+        this.apiEndpoint = apiEndpoint;
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
+    // *************************************************************************
+    //
+    // *************************************************************************
+
+    public ConsulConfiguration getConfiguration() {
+        return this.configuration;
+    }
+
+    public String getApiEndpoint() {
+        return this.apiEndpoint;
+    }
+
+    public synchronized Consul getConsul() throws Exception {
+        if (consul == null) {
+            Consul.Builder builder = Consul.builder();
+            builder.withPing(configuration.isPingInstance());
+
+            if (ObjectHelper.isNotEmpty(configuration.getUrl())) {
+                builder.withUrl(configuration.getUrl());
+            }
+            if (ObjectHelper.isNotEmpty(configuration.getSslContextParameters())) {
+                builder.withSslContext(configuration.getSslContextParameters().createSSLContext(getCamelContext()));
+            }
+            if (ObjectHelper.isNotEmpty(configuration.getAclToken())) {
+                builder.withAclToken(configuration.getAclToken());
+            }
+            if (configuration.requiresBasicAuthentication()) {
+                builder.withBasicAuth(configuration.getUserName(), configuration.getPassword());
+            }
+            if (ObjectHelper.isNotEmpty(configuration.getConnectTimeoutMillis())) {
+                builder.withConnectTimeoutMillis(configuration.getConnectTimeoutMillis());
+            }
+            if (ObjectHelper.isNotEmpty(configuration.getReadTimeoutMillis())) {
+                builder.withReadTimeoutMillis(configuration.getReadTimeoutMillis());
+            }
+            if (ObjectHelper.isNotEmpty(configuration.getWriteTimeoutMillis())) {
+                builder.withWriteTimeoutMillis(configuration.getWriteTimeoutMillis());
+            }
+
+            consul = builder.build();
+        }
+
+        return consul;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulProducer.java
new file mode 100644
index 0000000..2be260c
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/AbstractConsulProducer.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import java.util.function.Function;
+
+import com.orbitz.consul.Consul;
+import org.apache.camel.Message;
+import org.apache.camel.NoSuchHeaderException;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.HeaderSelectorProducer;
+
+
+public abstract class AbstractConsulProducer<C> extends HeaderSelectorProducer {
+    private final AbstractConsulEndpoint endpoint;
+    private final ConsulConfiguration configuration;
+    private final Function<Consul, C> clientSupplier;
+    private C client;
+
+    protected AbstractConsulProducer(AbstractConsulEndpoint endpoint, ConsulConfiguration configuration, Function<Consul, C> clientSupplier) {
+        super(endpoint, ConsulConstants.CONSUL_ACTION, configuration.getAction());
+
+        this.endpoint = endpoint;
+        this.configuration = configuration;
+        this.clientSupplier = clientSupplier;
+        this.client = null;
+    }
+
+    // *************************************************************************
+    //
+    // *************************************************************************
+
+    protected Consul getConsul() throws Exception {
+        return endpoint.getConsul();
+    }
+
+    protected C getClient() throws Exception {
+        if (client == null) {
+            client = clientSupplier.apply(getConsul());
+        }
+
+        return client;
+    }
+
+    protected ConsulConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    protected <D> D getHeader(Message message, String header, D defaultValue, Class<D> type) {
+        return message.getHeader(header, defaultValue, type);
+    }
+
+    protected <D> D getMandatoryHeader(Message message, String header, Class<D> type) throws Exception {
+        return getMandatoryHeader(message, header, null, type);
+    }
+
+    protected <D> D getMandatoryHeader(Message message, String header, D defaultValue, Class<D> type) throws Exception {
+        D value = getHeader(message, header, defaultValue, type);
+        if (value == null) {
+            throw new NoSuchHeaderException(message.getExchange(), header, type);
+        }
+
+        return value;
+    }
+
+    protected String getKey(Message message) {
+        return message.getHeader(
+            ConsulConstants.CONSUL_KEY,
+            configuration.getKey(),
+            String.class);
+    }
+
+    protected String getMandatoryKey(Message message) throws Exception {
+        return getMandatoryHeader(
+            message,
+            ConsulConstants.CONSUL_KEY,
+            configuration.getKey(),
+            String.class);
+    }
+
+    protected <T> T getOption(Message message, T defaultValue, Class<T> type) {
+        return message.getHeader(ConsulConstants.CONSUL_OPTIONS, defaultValue, type);
+    }
+
+    protected boolean isValueAsString(Message message) throws Exception {
+        return message.getHeader(
+            ConsulConstants.CONSUL_VALUE_AS_STRING,
+            configuration.isValueAsString(),
+            Boolean.class);
+    }
+
+    protected <T> T getBody(Message message, T defaultValue, Class<T> type) throws Exception {
+        T body = message.getBody(type);
+        if (body == null) {
+            body = defaultValue;
+        }
+
+        return  body;
+    }
+
+    protected void setBodyAndResult(Message message, Object body) throws Exception {
+        setBodyAndResult(message, body, body != null);
+    }
+
+    protected void setBodyAndResult(Message message, Object body, boolean result) throws Exception {
+        message.setHeader(ConsulConstants.CONSUL_RESULT, result);
+        if (body != null) {
+            message.setBody(body);
+        }
+    }
+
+    protected Processor wrap(Function<C, Object> supplier) {
+        return exchange -> setBodyAndResult(exchange.getIn(), supplier.apply(getClient()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java
new file mode 100644
index 0000000..bbd6a1c
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulComponent.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.component.consul.enpoint.ConsulAgentEndpoint;
+import org.apache.camel.component.consul.enpoint.ConsulEventEndpoint;
+import org.apache.camel.component.consul.enpoint.ConsulKeyValueEndpoint;
+import org.apache.camel.impl.UriEndpointComponent;
+
+/**
+ * Represents the component that manages {@link AbstractConsulEndpoint}.
+ */
+public class ConsulComponent extends UriEndpointComponent {
+    
+    public ConsulComponent() {
+        super(AbstractConsulEndpoint.class);
+    }
+
+    public ConsulComponent(CamelContext context) {
+        super(context, AbstractConsulEndpoint.class);
+    }
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        ConsulConfiguration configuration = new ConsulConfiguration();
+        setProperties(configuration, parameters);
+
+        return ConsulApiEndpoint.valueOf(remaining).create(uri, this, configuration);
+    }
+
+    private enum ConsulApiEndpoint implements ConsulEndpointFactory {
+        kv(ConsulKeyValueEndpoint::new),
+        event(ConsulEventEndpoint::new),
+        agent(ConsulAgentEndpoint::new);
+
+        private final ConsulEndpointFactory factory;
+
+        ConsulApiEndpoint(ConsulEndpointFactory factory) {
+            this.factory = factory;
+        }
+
+        @Override
+        public Endpoint create(String uri, ConsulComponent component, ConsulConfiguration configuration) throws Exception {
+            return factory.create(uri, component, configuration);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java
new file mode 100644
index 0000000..2c0e576
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConfiguration.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.jsse.SSLContextParameters;
+
+@UriParams
+public class ConsulConfiguration {
+    @UriParam
+    private String url;
+
+    @UriParam(label = "security")
+    private SSLContextParameters sslContextParameters;
+    @UriParam(label = "security")
+    private String aclToken;
+    @UriParam(label = "security")
+    private String userName;
+    @UriParam(label = "security")
+    private String password;
+
+    @UriParam
+    private Long connectTimeoutMillis;
+    @UriParam
+    private Long readTimeoutMillis;
+    @UriParam
+    private Long writeTimeoutMillis;
+    @UriParam(defaultValue = "true")
+    private boolean pingInstance = true;
+
+
+    @UriParam(label = "producer")
+    private String action;
+
+    @UriParam(label = "producer,kv", defaultValue = "false")
+    private boolean valueAsString;
+
+    @UriParam
+    private String key;
+
+    @UriParam(label = "consumer,watch", defaultValue = "10")
+    private Integer blockSeconds = 10;
+
+    @UriParam(label = "consumer,watch", defaultValue = "0")
+    private long firstIndex;
+
+    @UriParam(label = "consumer,watch", defaultValue = "false")
+    private boolean recursive;
+
+
+    public String getUrl() {
+        return url;
+    }
+
+    /**
+     * The Consul agent URL
+     */
+    public void setUrl(String url) {
+        this.url = url;
+    }
+
+    public SSLContextParameters getSslContextParameters() {
+        return sslContextParameters;
+    }
+
+    /**
+     * SSL configuration using an org.apache.camel.util.jsse.SSLContextParameters
+     * instance.
+     */
+    public void setSslContextParameters(SSLContextParameters sslContextParameters) {
+        this.sslContextParameters = sslContextParameters;
+    }
+
+    public String getAclToken() {
+        return aclToken;
+    }
+
+    /**
+     * Sets the ACL token to be used with Consul
+     */
+    public void setAclToken(String aclToken) {
+        this.aclToken = aclToken;
+    }
+
+    public String getAction() {
+        return action;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    /**
+     * Sets the username to be used for basic authentication
+     */
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    /**
+     * Sets the password to be used for basic authentication
+     */
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public boolean requiresBasicAuthentication() {
+        return ObjectHelper.isNotEmpty(userName) && ObjectHelper.isNotEmpty(password);
+    }
+
+    public Long getConnectTimeoutMillis() {
+        return connectTimeoutMillis;
+    }
+
+    /**
+     * Connect timeout for OkHttpClient
+     */
+    public void setConnectTimeoutMillis(Long connectTimeoutMillis) {
+        this.connectTimeoutMillis = connectTimeoutMillis;
+    }
+
+    public Long getReadTimeoutMillis() {
+        return readTimeoutMillis;
+    }
+
+    /**
+     * Read timeout for OkHttpClient
+     */
+    public void setReadTimeoutMillis(Long readTimeoutMillis) {
+        this.readTimeoutMillis = readTimeoutMillis;
+    }
+
+    public Long getWriteTimeoutMillis() {
+        return writeTimeoutMillis;
+    }
+
+    /**
+     * Write timeout for OkHttpClient
+     */
+    public void setWritTeimeoutMillis(Long writeTimeoutMillis) {
+        this.writeTimeoutMillis = writeTimeoutMillis;
+    }
+
+    public void setWriteTimeoutMillis(Long writeTimeoutMillis) {
+        this.writeTimeoutMillis = writeTimeoutMillis;
+    }
+
+    public boolean isPingInstance() {
+        return pingInstance;
+    }
+
+    /**
+     * Configure if the AgentClient should attempt a ping before returning the Consul instance
+     */
+    public void setPingInstance(boolean pingInstance) {
+        this.pingInstance = pingInstance;
+    }
+
+    /**
+     * The default action. Can be overridden by CamelConsulAction
+     */
+    public void setAction(String action) {
+        this.action = action;
+    }
+
+    public boolean isValueAsString() {
+        return valueAsString;
+    }
+
+    /**
+     * Default to transform values retrieved from Consul i.e. on KV endpoint to
+     * string.
+     */
+    public void setValueAsString(boolean valueAsString) {
+        this.valueAsString = valueAsString;
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    /**
+     * The default action. Can be overridden by CamelConsulKey
+     */
+    public void setKey(String key) {
+        this.key = key;
+    }
+
+    public Integer getBlockSeconds() {
+        return blockSeconds;
+    }
+
+    /**
+     * The second to wait for a watch event, default 10 seconds
+     */
+    public void setBlockSeconds(Integer blockSeconds) {
+        this.blockSeconds = blockSeconds;
+    }
+
+    public long getFirstIndex() {
+        return firstIndex;
+    }
+
+    /**
+     * The first index for watch for, default 0
+     */
+    public void setFirstIndex(long firstIndex) {
+        this.firstIndex = firstIndex;
+    }
+
+    public boolean isRecursive() {
+        return recursive;
+    }
+
+    /**
+     * Recursively watch, default false
+     */
+    public void setRecursive(boolean recursive) {
+        this.recursive = recursive;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
new file mode 100644
index 0000000..923e1d7
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+public interface ConsulConstants {
+    String CONSUL_ENDPOINT_KV = "kv";
+
+    String CONSUL_ACTION = "CamelConsulAction";
+    String CONSUL_KEY = "CamelConsulKey";
+    String CONSUL_EVENT_ID = "CamelConsulEventId";
+    String CONSUL_EVENT_NAME = "CamelConsulEventName";
+    String CONSUL_EVENT_LTIME = "CamelConsulEventLTime";
+    String CONSUL_NODE_FILTER = "CamelConsulNodeFilter";
+    String CONSUL_TAG_FILTER = "CamelConsulTagFilter";
+    String CONSUL_SERVICE_FILTER = "CamelConsulSessionFilter";
+    String CONSUL_VERSION = "CamelConsulVersion";
+    String CONSUL_VALUE = "CamelConsulValue";
+    String CONSUL_VALUES = "CamelConsulValues";
+    String CONSUL_FLAGS = "CamelConsulFlags";
+    String CONSUL_CREATE_INDEX = "CamelConsulCreateIndex";
+    String CONSUL_LOCK_INDEX = "CamelConsulCreateIndex";
+    String CONSUL_MODIFY_INDEX = "CamelConsulModifyIndex";
+    String CONSUL_OPTIONS = "CamelConsulOptions";
+    String CONSUL_RESULT = "CamelConsulResult";
+    String CONSUL_SESSION = "CamelConsulSession";
+    String CONSUL_OPERATION = "CamelConsulOperation";
+    String CONSUL_VALUE_AS_STRING = "CamelConsulValueAsString";
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpointFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpointFactory.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpointFactory.java
new file mode 100644
index 0000000..3401ff8
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulEndpointFactory.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul;
+
+
+import org.apache.camel.Endpoint;
+
+@FunctionalInterface
+public interface ConsulEndpointFactory {
+    Endpoint create(String uri, ConsulComponent component, ConsulConfiguration configuration) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentActions.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentActions.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentActions.java
new file mode 100644
index 0000000..aff15ab
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentActions.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.consul.enpoint;
+
+public interface ConsulAgentActions {
+    String CHECKS = "CHECKS";
+    String SERVICES = "SERVICES";
+    String MEMBERS = "MEMBERS";
+    String AGENT = "AGENT";
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentEndpoint.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentEndpoint.java
new file mode 100644
index 0000000..9d5742d
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentEndpoint.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.consul.enpoint;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.consul.AbstractConsulEndpoint;
+import org.apache.camel.component.consul.ConsulComponent;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.spi.UriEndpoint;
+
+@UriEndpoint(scheme = "consul", title = "Consul Agent", syntax = "consul://agent", producerOnly = true, label = "api,cloud")
+public class ConsulAgentEndpoint extends AbstractConsulEndpoint {
+    public ConsulAgentEndpoint(String uri, ConsulComponent component, ConsulConfiguration configuration) {
+        super("agent", uri, component, configuration);
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new ConsulAgentProducer(this, getConfiguration());
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        throw new IllegalArgumentException("Not implemented");
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentProducer.java
new file mode 100644
index 0000000..940095a
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulAgentProducer.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+import com.orbitz.consul.AgentClient;
+import org.apache.camel.component.consul.AbstractConsulEndpoint;
+import org.apache.camel.component.consul.AbstractConsulProducer;
+import org.apache.camel.component.consul.ConsulConfiguration;
+
+public class ConsulAgentProducer extends AbstractConsulProducer<AgentClient> {
+    ConsulAgentProducer(AbstractConsulEndpoint endpoint, ConsulConfiguration configuration) {
+        super(endpoint, configuration, c -> c.agentClient());
+
+        bind(ConsulAgentActions.CHECKS, wrap(c -> c.getChecks()));
+        bind(ConsulAgentActions.SERVICES, wrap(c -> c.getServices()));
+        bind(ConsulAgentActions.MEMBERS, wrap(c -> c.getMembers()));
+        bind(ConsulAgentActions.AGENT, wrap(c -> c.getAgent()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventActions.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventActions.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventActions.java
new file mode 100644
index 0000000..76e63ec
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventActions.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.consul.enpoint;
+
+public interface ConsulEventActions {
+    String FIRE = "FIRE";
+    String LIST = "LIST";
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventConsumer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventConsumer.java
new file mode 100644
index 0000000..f6f658c
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventConsumer.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+import java.math.BigInteger;
+import java.util.List;
+
+import com.orbitz.consul.EventClient;
+import com.orbitz.consul.async.EventResponseCallback;
+import com.orbitz.consul.model.EventResponse;
+import com.orbitz.consul.model.event.Event;
+import com.orbitz.consul.option.QueryOptions;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.consul.AbstractConsulConsumer;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.component.consul.ConsulConstants;
+
+public class ConsulEventConsumer extends AbstractConsulConsumer<EventClient> {
+    protected ConsulEventConsumer(ConsulEventEndpoint endpoint, ConsulConfiguration configuration, Processor processor) {
+        super(endpoint, configuration, processor, c -> c.eventClient());
+    }
+
+    @Override
+    protected Runnable createWatcher(EventClient client) throws Exception {
+        return new EventWatcher(client);
+    }
+
+    // *************************************************************************
+    // Watch
+    // *************************************************************************
+
+    private class EventWatcher extends AbstractWatcher implements EventResponseCallback {
+        EventWatcher(EventClient client) {
+            super(client);
+        }
+
+        @Override
+        public void watch(EventClient client) {
+            client.listEvents(
+                key,
+                QueryOptions.blockSeconds(configuration.getBlockSeconds(), index.get()).build(),
+                this
+            );
+        }
+
+        @Override
+        public void onComplete(EventResponse eventResponse) {
+            if (isRunAllowed()) {
+                List<Event> events = filterEvents(eventResponse.getEvents(), index.get());
+                events.forEach(this::onEvent);
+
+                setIndex(eventResponse.getIndex());
+
+                watch();
+            }
+        }
+
+        @Override
+        public void onFailure(Throwable throwable) {
+            onError(throwable);
+        }
+
+        private void onEvent(Event event) {
+            final Exchange exchange = endpoint.createExchange();
+            final Message message = exchange.getIn();
+
+            message.setHeader(ConsulConstants.CONSUL_KEY, key);
+            message.setHeader(ConsulConstants.CONSUL_RESULT, true);
+            message.setHeader(ConsulConstants.CONSUL_EVENT_ID, event.getId());
+            message.setHeader(ConsulConstants.CONSUL_EVENT_NAME, event.getName());
+            message.setHeader(ConsulConstants.CONSUL_EVENT_LTIME, event.getLTime());
+            message.setHeader(ConsulConstants.CONSUL_NODE_FILTER, event.getNodeFilter());
+            message.setHeader(ConsulConstants.CONSUL_SERVICE_FILTER, event.getServiceFilter());
+            message.setHeader(ConsulConstants.CONSUL_TAG_FILTER, event.getTagFilter());
+            message.setHeader(ConsulConstants.CONSUL_VERSION, event.getVersion());
+            message.setBody(event.getPayload().orNull());
+
+            try {
+                getProcessor().process(exchange);
+            } catch (Exception e) {
+                getExceptionHandler().handleException("Error processing exchange", exchange, e);
+            }
+        }
+
+        /**
+         * from spring-cloud-consul (https://github.com/spring-cloud/spring-cloud-consul):
+         *     spring-cloud-consul-bus/src/main/java/org/springframework/cloud/consul/bus/EventService.java
+         */
+        private List<Event> filterEvents(List<Event> toFilter, BigInteger lastIndex) {
+            List<Event> events = toFilter;
+            if (lastIndex != null) {
+                for (int i = 0; i < events.size(); i++) {
+                    Event event = events.get(i);
+                    BigInteger eventIndex = getEventIndexFromId(event);
+                    if (eventIndex.equals(lastIndex)) {
+                        events = events.subList(i + 1, events.size());
+                        break;
+                    }
+                }
+            }
+            return events;
+        }
+
+        private BigInteger getEventIndexFromId(Event event) {
+            String eventId = event.getId();
+            String lower = eventId.substring(0, 8) + eventId.substring(9, 13) + eventId.substring(14, 18);
+            String upper = eventId.substring(19, 23) + eventId.substring(24, 36);
+
+            BigInteger lowVal = new BigInteger(lower, 16);
+            BigInteger highVal = new BigInteger(upper, 16);
+
+            return lowVal.xor(highVal);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventEndpoint.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventEndpoint.java
new file mode 100644
index 0000000..df3254c
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventEndpoint.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.consul.enpoint;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.consul.AbstractConsulEndpoint;
+import org.apache.camel.component.consul.ConsulComponent;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.spi.UriEndpoint;
+
+
+@UriEndpoint(scheme = "consul", title = "Consul KeyValue", syntax = "consul://event", consumerClass = ConsulEventConsumer.class, label = "api,cloud")
+public class ConsulEventEndpoint extends AbstractConsulEndpoint {
+    public ConsulEventEndpoint(String uri, ConsulComponent component, ConsulConfiguration configuration) {
+        super("event", uri, component, configuration);
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new ConsulEventProducer(this, getConfiguration());
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new ConsulEventConsumer(this, getConfiguration(), processor);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventProducer.java
new file mode 100644
index 0000000..26b6595
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulEventProducer.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.consul.enpoint;
+
+import com.orbitz.consul.EventClient;
+import com.orbitz.consul.option.EventOptions;
+import com.orbitz.consul.option.QueryOptions;
+import org.apache.camel.InvokeOnHeader;
+import org.apache.camel.Message;
+import org.apache.camel.component.consul.AbstractConsulEndpoint;
+import org.apache.camel.component.consul.AbstractConsulProducer;
+import org.apache.camel.component.consul.ConsulConfiguration;
+
+public class ConsulEventProducer extends AbstractConsulProducer<EventClient> {
+    ConsulEventProducer(AbstractConsulEndpoint endpoint, ConsulConfiguration configuration) {
+        super(endpoint, configuration, c -> c.eventClient());
+    }
+
+    @InvokeOnHeader(ConsulEventActions.FIRE)
+    protected void fire(Message message) throws Exception {
+        setBodyAndResult(
+            message,
+            getClient().fireEvent(
+                getMandatoryKey(message),
+                getOption(message, EventOptions.BLANK, EventOptions.class),
+                message.getBody(String.class)
+            )
+        );
+    }
+
+    @InvokeOnHeader(ConsulEventActions.LIST)
+    protected void list(Message message) throws Exception {
+        setBodyAndResult(
+            message,
+            getClient().listEvents(
+                getKey(message),
+                getOption(message, QueryOptions.BLANK, QueryOptions.class)
+            )
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueActions.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueActions.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueActions.java
new file mode 100644
index 0000000..d014bbd
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueActions.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.consul.enpoint;
+
+public interface ConsulKeyValueActions {
+    String PUT = "PUT";
+    String GET_VALUE = "GET_VALUE";
+    String GET_VALUES = "GET_VALUES";
+    String GET_KEYS = "GET_KEYS";
+    String GET_SESSIONS = "GET_SESSIONS";
+    String DELETE_KEY = "DELETE_KEY";
+    String DELETE_KEYS = "DELETE_KEYS";
+    String LOCK = "LOCK";
+    String UNLOCK = "UNLOCK";
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueConsumer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueConsumer.java
new file mode 100644
index 0000000..a90d8cb
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueConsumer.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+import java.util.List;
+
+import com.google.common.base.Optional;
+import com.orbitz.consul.KeyValueClient;
+import com.orbitz.consul.async.ConsulResponseCallback;
+import com.orbitz.consul.model.ConsulResponse;
+import com.orbitz.consul.model.kv.Value;
+import com.orbitz.consul.option.QueryOptions;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.consul.AbstractConsulConsumer;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.component.consul.ConsulConstants;
+
+public class ConsulKeyValueConsumer extends AbstractConsulConsumer<KeyValueClient> {
+
+    protected ConsulKeyValueConsumer(ConsulKeyValueEndpoint endpoint, ConsulConfiguration configuration, Processor processor) {
+        super(endpoint, configuration, processor, c -> c.keyValueClient());
+    }
+
+    @Override
+    protected Runnable createWatcher(KeyValueClient client) throws Exception {
+        return configuration.isRecursive() ? new RecursivePathWatcher(client) : new PathWatcher(client);
+    }
+
+    // *************************************************************************
+    // Watch
+    // *************************************************************************
+
+    private abstract class AbstractPathWatcher<T> extends AbstractWatcher implements ConsulResponseCallback<T> {
+        protected AbstractPathWatcher(KeyValueClient client) {
+            super(client);
+        }
+
+        protected QueryOptions queryOptions() {
+            return QueryOptions.blockSeconds(configuration.getBlockSeconds(), index.get()).build();
+        }
+
+        @Override
+        public void onComplete(ConsulResponse<T> consulResponse) {
+            if (isRunAllowed()) {
+                onResponse(consulResponse.getResponse());
+                setIndex(consulResponse.getIndex());
+                watch();
+            }
+        }
+
+        @Override
+        public void onFailure(Throwable throwable) {
+            onError(throwable);
+        }
+
+        protected void onValue(Value value) {
+            final Exchange exchange = endpoint.createExchange();
+            final Message message = exchange.getIn();
+
+            message.setHeader(ConsulConstants.CONSUL_KEY, value.getKey());
+            message.setHeader(ConsulConstants.CONSUL_RESULT, true);
+            message.setHeader(ConsulConstants.CONSUL_FLAGS, value.getFlags());
+            message.setHeader(ConsulConstants.CONSUL_CREATE_INDEX, value.getCreateIndex());
+            message.setHeader(ConsulConstants.CONSUL_LOCK_INDEX, value.getLockIndex());
+            message.setHeader(ConsulConstants.CONSUL_MODIFY_INDEX, value.getModifyIndex());
+            message.setHeader(ConsulConstants.CONSUL_SESSION, value.getSession().orNull());
+            message.setBody(configuration.isValueAsString() ? value.getValueAsString().orNull() : value.getValue().orNull());
+
+            try {
+                getProcessor().process(exchange);
+            } catch (Exception e) {
+                getExceptionHandler().handleException("Error processing exchange", exchange, e);
+            }
+        }
+
+        protected abstract void onResponse(T consulResponse);
+    }
+
+    private class PathWatcher extends AbstractPathWatcher<Optional<Value>> {
+        PathWatcher(KeyValueClient client) {
+            super(client);
+        }
+
+        @Override
+        public void watch(KeyValueClient client) {
+            client.getValue(key, queryOptions(), this);
+        }
+
+        @Override
+        public void onResponse(Optional<Value> value) {
+            if (value.isPresent()) {
+                onValue(value.get());
+            }
+        }
+    }
+
+    private class RecursivePathWatcher extends AbstractPathWatcher<List<Value>> {
+        RecursivePathWatcher(KeyValueClient client) {
+            super(client);
+        }
+
+        @Override
+        public void watch(KeyValueClient client) {
+            client.getValues(key, queryOptions(), this);
+        }
+
+        @Override
+        public void onResponse(List<Value> values) {
+            values.forEach(this::onValue);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueEndpoint.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueEndpoint.java
new file mode 100644
index 0000000..2910910
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueEndpoint.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.consul.enpoint;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.consul.AbstractConsulEndpoint;
+import org.apache.camel.component.consul.ConsulComponent;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.spi.UriEndpoint;
+
+@UriEndpoint(scheme = "consul", title = "Consul KeyValue", syntax = "consul://kv", consumerClass = ConsulKeyValueConsumer.class, label = "api,cloud")
+public class ConsulKeyValueEndpoint extends AbstractConsulEndpoint {
+    public ConsulKeyValueEndpoint(String uri, ConsulComponent component, ConsulConfiguration configuration) {
+        super("kv", uri, component, configuration);
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new ConsulKeyValueProducer(this, getConfiguration());
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new ConsulKeyValueConsumer(this, getConfiguration(), processor);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueProducer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueProducer.java
new file mode 100644
index 0000000..9596f4c
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/enpoint/ConsulKeyValueProducer.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.enpoint;
+
+import com.orbitz.consul.KeyValueClient;
+import com.orbitz.consul.option.PutOptions;
+import com.orbitz.consul.option.QueryOptions;
+import org.apache.camel.InvokeOnHeader;
+import org.apache.camel.Message;
+import org.apache.camel.component.consul.AbstractConsulEndpoint;
+import org.apache.camel.component.consul.AbstractConsulProducer;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.component.consul.ConsulConstants;
+
+public class ConsulKeyValueProducer extends AbstractConsulProducer<KeyValueClient> {
+
+    ConsulKeyValueProducer(AbstractConsulEndpoint endpoint, ConsulConfiguration configuration) {
+        super(endpoint, configuration, c -> c.keyValueClient());
+    }
+
+    @InvokeOnHeader(ConsulKeyValueActions.PUT)
+    protected void put(Message message) throws Exception {
+        message.setHeader(
+            ConsulConstants.CONSUL_RESULT,
+            getClient().putValue(
+                getMandatoryKey(message),
+                message.getBody(String.class),
+                message.getHeader(ConsulConstants.CONSUL_FLAGS, 0L, Long.class),
+                getOption(message, PutOptions.BLANK, PutOptions.class)
+            )
+        );
+    }
+
+    @InvokeOnHeader(ConsulKeyValueActions.GET_VALUE)
+    protected void getValue(Message message) throws Exception {
+        Object result;
+
+        if (isValueAsString(message)) {
+            result = getClient().getValueAsString(
+                getMandatoryKey(message)
+            ).orNull();
+        } else {
+            result = getClient().getValue(
+                getMandatoryKey(message),
+                getOption(message, QueryOptions.BLANK, QueryOptions.class)
+            ).orNull();
+        }
+
+        setBodyAndResult(message, result);
+    }
+
+    @InvokeOnHeader(ConsulKeyValueActions.GET_VALUES)
+    protected void getValues(Message message) throws Exception {
+        Object result;
+
+        if (isValueAsString(message)) {
+            result = getClient().getValuesAsString(
+                getMandatoryKey(message)
+            );
+        } else {
+            result = getClient().getValues(
+                getMandatoryKey(message),
+                getOption(message, QueryOptions.BLANK, QueryOptions.class)
+            );
+        }
+
+        setBodyAndResult(message, result);
+    }
+
+    @InvokeOnHeader(ConsulKeyValueActions.GET_KEYS)
+    protected void getKeys(Message message) throws Exception {
+        setBodyAndResult(message, getClient().getKeys(getMandatoryKey(message)));
+    }
+
+    @InvokeOnHeader(ConsulKeyValueActions.GET_SESSIONS)
+    protected void getSessions(Message message) throws Exception {
+        setBodyAndResult(message, getClient().getSession(getMandatoryKey(message)));
+    }
+
+    @InvokeOnHeader(ConsulKeyValueActions.DELETE_KEY)
+    protected void deleteKey(Message message) throws Exception {
+        getClient().deleteKey(getMandatoryKey(message));
+        message.setHeader(ConsulConstants.CONSUL_RESULT, true);
+    }
+
+    @InvokeOnHeader(ConsulKeyValueActions.DELETE_KEYS)
+    protected void deleteKeys(Message message) throws Exception {
+        getClient().deleteKeys(getMandatoryKey(message));
+        message.setHeader(ConsulConstants.CONSUL_RESULT, true);
+    }
+
+    @InvokeOnHeader(ConsulKeyValueActions.LOCK)
+    protected void lock(Message message) throws Exception {
+        message.setHeader(ConsulConstants.CONSUL_RESULT,
+            getClient().acquireLock(
+                getMandatoryKey(message),
+                getBody(message, null, String.class),
+                message.getHeader(ConsulConstants.CONSUL_SESSION, "", String.class)
+            )
+        );
+    }
+
+    @InvokeOnHeader(ConsulKeyValueActions.UNLOCK)
+    protected void unlock(Message message) throws Exception {
+        message.setHeader(ConsulConstants.CONSUL_RESULT,
+            getClient().releaseLock(
+                getMandatoryKey(message),
+                getMandatoryHeader(message, ConsulConstants.CONSUL_SESSION, String.class)
+            )
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38d5374a/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java
new file mode 100644
index 0000000..0b44e86
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.policy;
+
+import java.math.BigInteger;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.base.Optional;
+import com.orbitz.consul.Consul;
+import com.orbitz.consul.KeyValueClient;
+import com.orbitz.consul.SessionClient;
+import com.orbitz.consul.async.ConsulResponseCallback;
+import com.orbitz.consul.model.ConsulResponse;
+import com.orbitz.consul.model.kv.Value;
+import com.orbitz.consul.model.session.ImmutableSession;
+import com.orbitz.consul.option.QueryOptions;
+import org.apache.camel.Exchange;
+import org.apache.camel.NonManagedService;
+import org.apache.camel.Route;
+import org.apache.camel.support.RoutePolicySupport;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsulRoutePolicy extends RoutePolicySupport implements NonManagedService {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ConsulRoutePolicy.class);
+
+    private final Object lock;
+    private final Consul consul;
+    private final SessionClient sessionClient;
+    private final KeyValueClient keyValueClient;
+    private final AtomicBoolean leader;
+    private final Set<Route> suspendedRoutes;
+    private final AtomicReference<BigInteger> index;
+
+    private String serviceName;
+    private String servicePath;
+    private int ttl;
+    private int lockDelay;
+    private ExecutorService executorService;
+    private boolean shouldStopConsumer;
+
+    private String sessionId;
+
+    public ConsulRoutePolicy() {
+        this(Consul.builder().build());
+    }
+
+    public ConsulRoutePolicy(Consul consul) {
+        this.consul = consul;
+        this.sessionClient = consul.sessionClient();
+        this.keyValueClient = consul.keyValueClient();
+        this.suspendedRoutes =  new HashSet<>();
+        this.leader = new AtomicBoolean(false);
+        this.lock = new Object();
+        this.index = new AtomicReference<>(BigInteger.valueOf(0));
+        this.serviceName = null;
+        this.servicePath = null;
+        this.ttl = 60;
+        this.lockDelay = 10;
+        this.executorService = null;
+        this.shouldStopConsumer = true;
+        this.sessionId = null;
+    }
+
+    @Override
+    public void onExchangeBegin(Route route, Exchange exchange)  {
+        if (leader.get()) {
+            if (shouldStopConsumer) {
+                startConsumer(route);
+            }
+        } else {
+            if (shouldStopConsumer) {
+                stopConsumer(route);
+            }
+
+            exchange.setException(new IllegalStateException(
+                "Consul based route policy prohibits processing exchanges, stopping route and failing the exchange")
+            );
+        }
+    }
+
+    @Override
+    public void onStop(Route route) {
+        synchronized (lock) {
+            suspendedRoutes.remove(route);
+        }
+    }
+
+    @Override
+    public synchronized void onSuspend(Route route) {
+        synchronized (lock) {
+            suspendedRoutes.remove(route);
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (sessionId == null) {
+            sessionId = sessionClient.createSession(
+                ImmutableSession.builder()
+                    .name(serviceName)
+                    .ttl(ttl + "s")
+                    .lockDelay(lockDelay + "s")
+                    .build()
+                ).getId();
+
+            LOGGER.debug("SessionID = {}", sessionId);
+            if (executorService == null) {
+                executorService = Executors.newSingleThreadExecutor();
+            }
+
+            setLeader(keyValueClient.acquireLock(servicePath, sessionId));
+
+            executorService.submit(new Watcher());
+        }
+
+        super.doStart();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+
+        if (sessionId != null) {
+            sessionClient.destroySession(sessionId);
+            sessionId = null;
+
+            if (executorService != null) {
+                executorService.shutdown();
+                executorService.awaitTermination(ttl / 3, TimeUnit.SECONDS);
+            }
+        }
+    }
+
+    // *************************************************************************
+    //
+    // *************************************************************************
+
+    protected void setLeader(boolean isLeader) {
+        if (isLeader && leader.compareAndSet(false, isLeader)) {
+            LOGGER.debug("Leadership taken ({}, {})", serviceName, sessionId);
+            startAllStoppedConsumers();
+        } else {
+            if (!leader.getAndSet(isLeader) && isLeader) {
+                LOGGER.debug("Leadership lost ({}, {})", serviceName, sessionId);
+            }
+        }
+    }
+
+    private void startConsumer(Route route) {
+        synchronized (lock) {
+            try {
+                if (suspendedRoutes.contains(route)) {
+                    startConsumer(route.getConsumer());
+                    suspendedRoutes.remove(route);
+                }
+            } catch (Exception e) {
+                handleException(e);
+            }
+        }
+    }
+
+    private void stopConsumer(Route route) {
+        synchronized (lock) {
+            try {
+                if (!suspendedRoutes.contains(route)) {
+                    LOGGER.debug("Stopping consumer for {} ({})", route.getId(), route.getConsumer());
+                    stopConsumer(route.getConsumer());
+                    suspendedRoutes.add(route);
+                }
+            } catch (Exception e) {
+                handleException(e);
+            }
+        }
+    }
+
+    private void startAllStoppedConsumers() {
+        synchronized (lock) {
+            try {
+                for (Route route : suspendedRoutes) {
+                    LOGGER.debug("Starting consumer for {} ({})", route.getId(), route.getConsumer());
+                    startConsumer(route.getConsumer());
+                }
+
+                suspendedRoutes.clear();
+            } catch (Exception e) {
+                handleException(e);
+            }
+        }
+    }
+
+    // *************************************************************************
+    // Getter/Setters
+    // *************************************************************************
+
+    public Consul getConsul() {
+        return consul;
+    }
+
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    public void setServiceName(String serviceName) {
+        this.serviceName = serviceName;
+        this.servicePath = String.format("/service/%s/leader", serviceName);
+    }
+
+    public int getTtl() {
+        return ttl;
+    }
+
+    public void setTtl(int ttl) {
+        this.ttl = ttl > 10 ? ttl : 10;
+    }
+
+    public int getLockDelay() {
+        return lockDelay;
+    }
+
+    public void setLockDelay(int lockDelay) {
+        this.lockDelay = lockDelay > 10 ? lockDelay : 10;
+    }
+
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+
+    public boolean isShouldStopConsumer() {
+        return shouldStopConsumer;
+    }
+
+    public void setShouldStopConsumer(boolean shouldStopConsumer) {
+        this.shouldStopConsumer = shouldStopConsumer;
+    }
+
+    // *************************************************************************
+    // Watch
+    // *************************************************************************
+
+    private class Watcher implements Runnable, ConsulResponseCallback<Optional<Value>> {
+
+        @Override
+        public void onComplete(ConsulResponse<Optional<Value>> consulResponse) {
+            if (isRunAllowed()) {
+                Value response = consulResponse.getResponse().orNull();
+                if (response != null) {
+                    String sid = response.getSession().orNull();
+                    if (ObjectHelper.isEmpty(sid)) {
+                        // If the key is not held by any session, try acquire a
+                        // lock (become leader)
+                        LOGGER.debug("Try to take leadership ...");
+                        setLeader(keyValueClient.acquireLock(servicePath, sessionId));
+                    } else if (!sessionId.equals(sid) && leader.get()) {
+                        // Looks like I've lost leadership
+                        setLeader(false);
+                    }
+                }
+
+                index.set(consulResponse.getIndex());
+                run();
+            }
+        }
+
+        @Override
+        public void onFailure(Throwable throwable) {
+            handleException(throwable);
+        }
+
+        @Override
+        public void run() {
+            if (isRunAllowed()) {
+                // Refresh session
+                sessionClient.renewSession(sessionId);
+
+                keyValueClient.getValue(
+                    servicePath,
+                    QueryOptions.blockSeconds(ttl / 3, index.get()).build(),
+                    this);
+            }
+        }
+    }
+}


Mime
View raw message