Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C67A819E61 for ; Wed, 27 Apr 2016 10:44:03 +0000 (UTC) Received: (qmail 2105 invoked by uid 500); 27 Apr 2016 10:44:03 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 2058 invoked by uid 500); 27 Apr 2016 10:44:03 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 2049 invoked by uid 99); 27 Apr 2016 10:44:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Apr 2016 10:44:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 44DB9DFD5B; Wed, 27 Apr 2016 10:44:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lburgazzoli@apache.org To: commits@camel.apache.org Message-Id: <3d9c792b9b6b4379b76826c41b276cd8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: camel git commit: CAMEL-9834 : WatchConsumer does not properly set watchIndex Date: Wed, 27 Apr 2016 10:44:03 +0000 (UTC) Repository: camel Updated Branches: refs/heads/master 473935b53 -> 73bce6149 CAMEL-9834 : WatchConsumer does not properly set watchIndex Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/73bce614 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/73bce614 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/73bce614 Branch: refs/heads/master Commit: 73bce61493c76b3a549c55c28fd6504f14ef5af2 Parents: 473935b Author: lburgazzoli Authored: Thu Apr 7 14:14:27 2016 +0200 Committer: lburgazzoli Committed: Wed Apr 27 12:40:01 2016 +0200 ---------------------------------------------------------------------- components/camel-etcd/pom.xml | 37 +++++++ .../camel/component/etcd/EtcdComponent.java | 25 +---- .../camel/component/etcd/EtcdConfiguration.java | 12 +++ .../camel/component/etcd/EtcdWatchConsumer.java | 104 ++++++++++++++----- .../camel/component/etcd/EtcdKeysTest.java | 3 +- .../camel/component/etcd/EtcdStatsTest.java | 2 +- .../camel/component/etcd/EtcdWatchTest.java | 74 ++++++++----- parent/pom.xml | 2 +- 8 files changed, 181 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/73bce614/components/camel-etcd/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-etcd/pom.xml b/components/camel-etcd/pom.xml index 7045614..1868850 100644 --- a/components/camel-etcd/pom.xml +++ b/components/camel-etcd/pom.xml @@ -85,4 +85,41 @@ + + + + + etcd-skip-tests + + true + + + + + maven-surefire-plugin + + true + + + + + + + etcd-tests + + false + + + + + maven-surefire-plugin + + false + + + + + + + http://git-wip-us.apache.org/repos/asf/camel/blob/73bce614/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdComponent.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdComponent.java index eab5d35..3b81579 100644 --- a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdComponent.java +++ b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdComponent.java @@ -49,6 +49,7 @@ public class EtcdComponent extends UriEndpointComponent { } EtcdNamespace namespace = getCamelContext().getTypeConverter().mandatoryConvertTo(EtcdNamespace.class, ns); + EtcdConfiguration configuration = loadConfiguration(new EtcdConfiguration(), parameters); if (namespace != null) { // path must start with leading slash @@ -58,29 +59,11 @@ public class EtcdComponent extends UriEndpointComponent { switch (namespace) { case stats: - return new EtcdStatsEndpoint( - uri, - this, - loadConfiguration(new EtcdConfiguration(), parameters), - namespace, - path - ); + return new EtcdStatsEndpoint(uri, this, configuration, namespace, path); case watch: - return new EtcdWatchEndpoint( - uri, - this, - loadConfiguration(new EtcdConfiguration(), parameters), - namespace, - path - ); + return new EtcdWatchEndpoint(uri, this, configuration, namespace, path); case keys: - return new EtcdKeysEndpoint( - uri, - this, - loadConfiguration(new EtcdConfiguration(), parameters), - namespace, - path - ); + return new EtcdKeysEndpoint(uri, this, configuration, namespace, path); default: throw new IllegalStateException("No endpoint for " + remaining); } http://git-wip-us.apache.org/repos/asf/camel/blob/73bce614/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java index 9fe36cd..45d50f7 100644 --- a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java +++ b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java @@ -39,6 +39,8 @@ public class EtcdConfiguration { private Integer timeToLive; @UriParam private Long timeout; + @UriParam(label = "consumer,advance", defaultValue = "0") + private Long fromIndex = 0L; public String getUris() { return uris; @@ -128,4 +130,14 @@ public class EtcdConfiguration { this.timeout = timeout; } + public Long getFromIndex() { + return fromIndex; + } + + /** + * The index to watch from + */ + public void setFromIndex(Long fromIndex) { + this.fromIndex = fromIndex; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/73bce614/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdWatchConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdWatchConsumer.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdWatchConsumer.java index 4c3e6fa..9f85a0a 100644 --- a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdWatchConsumer.java +++ b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdWatchConsumer.java @@ -16,12 +16,14 @@ */ package org.apache.camel.component.etcd; -import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import mousio.client.promises.ResponsePromise; import mousio.etcd4j.requests.EtcdKeyGetRequest; +import mousio.etcd4j.responses.EtcdErrorCode; +import mousio.etcd4j.responses.EtcdException; import mousio.etcd4j.responses.EtcdKeysResponse; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -30,15 +32,18 @@ import org.slf4j.LoggerFactory; public class EtcdWatchConsumer extends AbstractEtcdConsumer implements ResponsePromise.IsSimplePromiseResponseHandler { private static final Logger LOGGER = LoggerFactory.getLogger(EtcdWatchConsumer.class); + private static final String OUTDATED_EVENT_MSG = "requested index is outdated and cleared"; private final EtcdWatchEndpoint endpoint; private final EtcdConfiguration configuration; + private final AtomicLong index; public EtcdWatchConsumer(EtcdWatchEndpoint endpoint, Processor processor, EtcdConfiguration configuration, EtcdNamespace namespace, String path) { super(endpoint, processor, configuration, namespace, path); this.endpoint = endpoint; this.configuration = configuration; + this.index = new AtomicLong(configuration.getFromIndex()); } @Override @@ -59,35 +64,78 @@ public class EtcdWatchConsumer extends AbstractEtcdConsumer implements ResponseP return; } - try { - EtcdKeysResponse response = promise.get(); - - Exchange exchange = endpoint.createExchange(); - exchange.getIn().setHeader(EtcdConstants.ETCD_NAMESPACE, getNamespace()); - exchange.getIn().setHeader(EtcdConstants.ETCD_PATH, response.node.key); - exchange.getIn().setBody(response); - - getProcessor().process(exchange); - - watch(); - } catch (TimeoutException e) { - LOGGER.debug("Timeout watching for {}", getPath()); - - if (configuration.isSendEmptyExchangeOnTimeout()) { - Exchange exchange = endpoint.createExchange(); - try { + Exchange exchange = null; + Throwable throwable = promise.getException(); + + if (throwable != null && throwable instanceof EtcdException) { + EtcdException exception = (EtcdException) throwable; + // Etcd only keeps the responses of the most recent 1000 events + // across all etcd keys so if we wait for a cleared index, we + // get "index is outdated response" like: + // + // { + // "errorCode" : 401, + // "message" : "The event in requested index is outdated and cleared", + // "cause" : "the requested history has been cleared [1008/8]", + // "index" : 2007 + // } + // + // So we set the index to the one returned by the exception + 1 + if (isOutdatedIndexException(exception)) { + LOGGER.debug("Outdated index, key: {}, cause={}", getPath(), exception.etcdCause); + + // We set the index to the one returned by the exception + 1. + index.set(exception.index + 1); + + // Clean-up the exception so it is not rethrown/handled + throwable = null; + } + } else { + try { + EtcdKeysResponse response = promise.get(); + + exchange = endpoint.createExchange(); + exchange.getIn().setHeader(EtcdConstants.ETCD_NAMESPACE, getNamespace()); + exchange.getIn().setHeader(EtcdConstants.ETCD_PATH, response.node.key); + exchange.getIn().setBody(response); + + // Watch from the modifiedIndex + 1 of the node we got for ensuring + // no events are missed between watch commands + index.set(response.node.modifiedIndex + 1); + } catch (TimeoutException e) { + LOGGER.debug("Timeout watching for {}", getPath()); + + if (configuration.isSendEmptyExchangeOnTimeout()) { + exchange = endpoint.createExchange(); exchange.getIn().setHeader(EtcdConstants.ETCD_NAMESPACE, getNamespace()); exchange.getIn().setHeader(EtcdConstants.ETCD_TIMEOUT, true); exchange.getIn().setHeader(EtcdConstants.ETCD_PATH, getPath()); exchange.getIn().setBody(null); + } + + throwable = null; + } catch (Exception e1) { + throwable = e1; + } + if (exchange != null) { + try { + throwable = null; getProcessor().process(exchange); - } catch (Exception e1) { - getExceptionHandler().handleException("Error processing exchange", exchange, e1); + } catch (Exception e) { + getExceptionHandler().handleException("Error processing exchange", exchange, e); } } + } + + if (throwable != null){ + handleException("Error processing etcd response", throwable); + } + + try { + watch(); } catch (Exception e) { - throw new IllegalArgumentException(e); + handleException("Error watching key " + getPath(), e); } } @@ -96,7 +144,7 @@ public class EtcdWatchConsumer extends AbstractEtcdConsumer implements ResponseP return; } - EtcdKeyGetRequest request = getClient().get(getPath()).waitForChange(); + EtcdKeyGetRequest request = getClient().get(getPath()).waitForChange(index.get()); if (configuration.isRecursive()) { request.recursive(); } @@ -104,10 +152,14 @@ public class EtcdWatchConsumer extends AbstractEtcdConsumer implements ResponseP request.timeout(configuration.getTimeout(), TimeUnit.MILLISECONDS); } - try { - request.send().addListener(this); - } catch (IOException e) { - throw new IllegalArgumentException(e); + request.send().addListener(this); + } + + private boolean isOutdatedIndexException(EtcdException exception) { + if (exception.isErrorCode(EtcdErrorCode.EventIndexCleared) && exception.etcdMessage != null) { + return exception.etcdMessage.toLowerCase().contains(OUTDATED_EVENT_MSG); } + + return false; } } http://git-wip-us.apache.org/repos/asf/camel/blob/73bce614/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdKeysTest.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdKeysTest.java b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdKeysTest.java index eca2339..cf16017 100644 --- a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdKeysTest.java +++ b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdKeysTest.java @@ -27,10 +27,9 @@ import org.apache.camel.Exchange; import org.apache.camel.Predicate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.junit.Ignore; import org.junit.Test; -@Ignore("Etcd must be started manually") +//@Ignore("Etcd must be started manually") public class EtcdKeysTest extends EtcdTest { @Test(expected = EtcdException.class) http://git-wip-us.apache.org/repos/asf/camel/blob/73bce614/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdStatsTest.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdStatsTest.java b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdStatsTest.java index b142355..762be01 100644 --- a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdStatsTest.java +++ b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdStatsTest.java @@ -26,7 +26,7 @@ import org.apache.camel.component.mock.MockEndpoint; import org.junit.Ignore; import org.junit.Test; -@Ignore("Etcd must be started manually") +//@Ignore("Etcd must be started manually") public class EtcdStatsTest extends EtcdTest { @Test http://git-wip-us.apache.org/repos/asf/camel/blob/73bce614/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdWatchTest.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdWatchTest.java b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdWatchTest.java index 0bc7ab9..60cea00 100644 --- a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdWatchTest.java +++ b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdWatchTest.java @@ -17,29 +17,52 @@ package org.apache.camel.component.etcd; import mousio.etcd4j.EtcdClient; -import org.apache.camel.Exchange; -import org.apache.camel.Predicate; +import mousio.etcd4j.responses.EtcdErrorCode; +import mousio.etcd4j.responses.EtcdException; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.junit.Ignore; import org.junit.Test; -@Ignore("Etcd must be started manually") +//@Ignore("Etcd must be started manually") public class EtcdWatchTest extends EtcdTest { @Test public void testWatchWithPath() throws Exception { - testWatch("mock:watch-with-path", "/myKey1", true); + testWatch("mock:watch-with-path", "/myKey1", 10); } @Test public void testWatchWithConfigPath() throws Exception { - testWatch("mock:watch-with-config-path", "/myKey2", true); + testWatch("mock:watch-with-config-path", "/myKey2", 10); } @Test public void testWatchRecursive() throws Exception { - testWatch("mock:watch-recursive", "/recursive/myKey1", true); + testWatch("mock:watch-recursive", "/recursive/myKey1", 10); + } + + @Test + public void testWatchRecovery() throws Exception { + final String key = "/myKeyRecovery"; + final EtcdClient client = getClient(); + + try { + // Delete the key if present + client.delete(key).send().get(); + } catch (EtcdException e) { + if (!e.isErrorCode(EtcdErrorCode.KeyNotFound)) { + throw e; + } + } + + // Fill the vent backlog ( > 1000) + for (int i = 0; i < 2000; i++) { + client.put(key, "v" + i).send().get(); + } + + context().startRoute("watchRecovery"); + + testWatch("mock:watch-recovery", key, 10); } @Test @@ -49,33 +72,25 @@ public class EtcdWatchTest extends EtcdTest { mock.expectedHeaderReceived(EtcdConstants.ETCD_NAMESPACE, EtcdNamespace.watch.name()); mock.expectedHeaderReceived(EtcdConstants.ETCD_PATH, "/timeoutKey"); mock.expectedHeaderReceived(EtcdConstants.ETCD_TIMEOUT, true); - mock.expectedMessagesMatches(new Predicate() { - @Override - public boolean matches(Exchange exchange) { - return exchange.getIn().getBody() == null; - } - }); - + mock.allMessages().body().isNull(); mock.assertIsSatisfied(); } - private void testWatch(String mockEndpoint, final String key, boolean updateKey) throws Exception { + private void testWatch(String mockEndpoint, final String key, int updates) throws Exception { + final String[] values = new String[updates]; + for (int i = 0; i< updates; i++) { + values[i] = key + "=myValue-" + i; + } + MockEndpoint mock = getMockEndpoint(mockEndpoint); mock.expectedMessageCount(2); mock.expectedHeaderReceived(EtcdConstants.ETCD_NAMESPACE, EtcdNamespace.watch.name()); mock.expectedHeaderReceived(EtcdConstants.ETCD_PATH, key); - mock.expectedMessagesMatches(new Predicate() { - @Override - public boolean matches(Exchange exchange) { - return exchange.getIn().getBody(String.class).startsWith(key + "=myValue-"); - } - }); + mock.expectedBodiesReceived(values); - if (updateKey) { - EtcdClient client = getClient(); - client.put(key, "myValue-1").send().get(); - Thread.sleep(250); - client.put(key, "myValue-2").send().get(); + final EtcdClient client = getClient(); + for (int i = 0; i< updates; i++) { + client.put(key, "myValue-" + i).send().get(); } mock.assertIsSatisfied(); @@ -88,8 +103,13 @@ public class EtcdWatchTest extends EtcdTest { from("etcd:watch/myKey1") .process(NODE_TO_VALUE_IN) .to("mock:watch-with-path"); - from("etcd:watch/recursive?recursive=true") + fromF("etcd:watch/myKeyRecovery?timeout=%d&fromIndex=%d", 1000 * 60 * 5, 1) + .id("watchRecovery") + .autoStartup(false) .process(NODE_TO_VALUE_IN) + .to("mock:watch-recovery"); + from("etcd:watch/recursive?recursive=true") + .process(NODE_TO_VALUE_IN) .to("log:org.apache.camel.component.etcd?level=INFO") .to("mock:watch-recursive"); from("etcd:watch/myKey2") http://git-wip-us.apache.org/repos/asf/camel/blob/73bce614/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index 0b31710..9b3eaba 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -161,7 +161,7 @@ 1.0.1 1.50.1 - 2.10.0 + 2.11.0 1.4.0 1.0.6_1 2.4.3