Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B24C92009F9 for ; Mon, 23 May 2016 20:52:26 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B0F13160A0E; Mon, 23 May 2016 18:52:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D0C45160A05 for ; Mon, 23 May 2016 20:52:25 +0200 (CEST) Received: (qmail 10194 invoked by uid 500); 23 May 2016 18:52:25 -0000 Mailing-List: contact commits-help@celix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@celix.apache.org Delivered-To: mailing list commits@celix.apache.org Received: (qmail 10185 invoked by uid 99); 23 May 2016 18:52:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 May 2016 18:52:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 26858DFBDE; Mon, 23 May 2016 18:52:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pnoltes@apache.org To: commits@celix.apache.org Date: Mon, 23 May 2016 18:52:24 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] celix git commit: CELIX-361: Applies fix for bug in etcd watch concerning max 1000 transactions. archived-at: Mon, 23 May 2016 18:52:26 -0000 Repository: celix Updated Branches: refs/heads/develop f593c02e2 -> feea43ed7 CELIX-361: Applies fix for bug in etcd watch concerning max 1000 transactions. Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/c4060e66 Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/c4060e66 Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/c4060e66 Branch: refs/heads/develop Commit: c4060e660e5e69609ba255e8815153dfeda17003 Parents: f593c02 Author: Pepijn Noltes Authored: Mon May 23 20:51:42 2016 +0200 Committer: Pepijn Noltes Committed: Mon May 23 20:51:42 2016 +0200 ---------------------------------------------------------------------- .../discovery_etcd/private/include/etcd.h | 4 +- .../discovery_etcd/private/src/etcd.c | 12 +++++- .../discovery_etcd/private/src/etcd_watcher.c | 40 +++++++++++++------- 3 files changed, 40 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/c4060e66/remote_services/discovery_etcd/private/include/etcd.h ---------------------------------------------------------------------- diff --git a/remote_services/discovery_etcd/private/include/etcd.h b/remote_services/discovery_etcd/private/include/etcd.h index f5624d0..2ba09de 100644 --- a/remote_services/discovery_etcd/private/include/etcd.h +++ b/remote_services/discovery_etcd/private/include/etcd.h @@ -48,12 +48,14 @@ #define ETCD_JSON_KEY "key" #define ETCD_JSON_VALUE "value" #define ETCD_JSON_MODIFIEDINDEX "modifiedIndex" +#define ETCD_ERROR_INDICATION "errorCode" +#define ETCD_INDEX "index" celix_status_t etcd_init(char* server, int port); bool etcd_get(char* key, char* value, char*action, int* modifiedIndex); bool etcd_getNodes(char* directory, char** nodeNames, int* size); bool etcd_set(char* key, char* value, int ttl, bool prevExist); bool etcd_del(char* key); -bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value, char* rkey, int *modifiedIndex); +bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value, char* rkey, int *modifiedIndex, int *error); #endif /* ETCD_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/c4060e66/remote_services/discovery_etcd/private/src/etcd.c ---------------------------------------------------------------------- diff --git a/remote_services/discovery_etcd/private/src/etcd.c b/remote_services/discovery_etcd/private/src/etcd.c index ed06fb7..1b74f66 100644 --- a/remote_services/discovery_etcd/private/src/etcd.c +++ b/remote_services/discovery_etcd/private/src/etcd.c @@ -310,7 +310,7 @@ bool etcd_del(char* key) { ///watch -bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value, char* rkey, int* modifiedIndex) { +bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value, char* rkey, int* modifiedIndex, int* errorCode) { json_error_t error; json_t* js_root = NULL; json_t* js_node = NULL; @@ -320,7 +320,9 @@ bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value json_t* js_rkey = NULL; json_t* js_prevValue = NULL; json_t* js_modIndex = NULL; + json_t* js_error = NULL; // used to indicate valid json response with ETCD error indication bool retVal = false; + *errorCode = 0; char url[MAX_URL_LENGTH]; int res; struct MemoryStruct reply; @@ -343,6 +345,7 @@ bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value js_action = json_object_get(js_root, ETCD_JSON_ACTION); js_node = json_object_get(js_root, ETCD_JSON_NODE); js_prevNode = json_object_get(js_root, ETCD_JSON_PREVNODE); + js_error = json_object_get(js_root, ETCD_ERROR_INDICATION); } if (js_prevNode != NULL) { js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE); @@ -373,6 +376,13 @@ bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value retVal = true; } + if ((js_error != NULL) && (json_is_integer(js_error))) { + *errorCode = json_integer_value(js_error); + js_modIndex = json_object_get(js_root, ETCD_INDEX); + if ((js_modIndex != NULL) && (json_is_integer(js_modIndex))) { + *modifiedIndex = json_integer_value(js_modIndex); + } + } if (js_root != NULL) { json_decref(js_root); } http://git-wip-us.apache.org/repos/asf/celix/blob/c4060e66/remote_services/discovery_etcd/private/src/etcd_watcher.c ---------------------------------------------------------------------- diff --git a/remote_services/discovery_etcd/private/src/etcd_watcher.c b/remote_services/discovery_etcd/private/src/etcd_watcher.c index 915f420..0675bc2 100644 --- a/remote_services/discovery_etcd/private/src/etcd_watcher.c +++ b/remote_services/discovery_etcd/private/src/etcd_watcher.c @@ -264,6 +264,7 @@ static void* etcdWatcher_run(void* data) { time_t timeBeforeWatch = time(NULL); static char rootPath[MAX_ROOTNODE_LENGTH]; int highestModified = 0; + int errorCode=0; bundle_context_pt context = watcher->discovery->context; @@ -276,23 +277,34 @@ static void* etcdWatcher_run(void* data) { char value[MAX_VALUE_LENGTH]; char preValue[MAX_VALUE_LENGTH]; char action[MAX_ACTION_LENGTH]; - int modIndex; - - if (etcd_watch(rootPath, highestModified + 1, &action[0], &preValue[0], &value[0], &rkey[0], &modIndex) == true) { - if (strcmp(action, "set") == 0) { - etcdWatcher_addEntry(watcher, &rkey[0], &value[0]); - } else if (strcmp(action, "delete") == 0) { - etcdWatcher_removeEntry(watcher, &rkey[0], &value[0]); - } else if (strcmp(action, "expire") == 0) { - etcdWatcher_removeEntry(watcher, &rkey[0], &value[0]); - } else if (strcmp(action, "update") == 0) { - etcdWatcher_addEntry(watcher, &rkey[0], &value[0]); - } else { - logHelper_log(*watcher->loghelper, OSGI_LOGSERVICE_INFO, "Unexpected action: %s", action); - } + int modIndex=0; + + if (etcd_watch(rootPath, highestModified + 1, &action[0], &preValue[0], &value[0], &rkey[0], &modIndex, &errorCode) == true) { + if (strcmp(action, "set") == 0) { + etcdWatcher_addEntry(watcher, &rkey[0], &value[0]); + } else if (strcmp(action, "delete") == 0) { + etcdWatcher_removeEntry(watcher, &rkey[0], &value[0]); + } else if (strcmp(action, "expire") == 0) { + etcdWatcher_removeEntry(watcher, &rkey[0], &value[0]); + } else if (strcmp(action, "update") == 0) { + etcdWatcher_addEntry(watcher, &rkey[0], &value[0]); + } else { + logHelper_log(*watcher->loghelper, OSGI_LOGSERVICE_INFO, "Unexpected action: %s", action); + } + highestModified = modIndex; + } + /* prevent busy waiting, in case etcd_watch returns false */ + else { + switch (errorCode) { + case 401: + // Etcd can store at most 1000 events highestModified = modIndex; + break; + default: + break; } + } // update own framework uuid if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL/2)) {