Return-Path: X-Original-To: apmail-celix-commits-archive@www.apache.org Delivered-To: apmail-celix-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8BDE810D57 for ; Wed, 2 Sep 2015 11:44:15 +0000 (UTC) Received: (qmail 91143 invoked by uid 500); 2 Sep 2015 11:44:15 -0000 Delivered-To: apmail-celix-commits-archive@celix.apache.org Received: (qmail 91088 invoked by uid 500); 2 Sep 2015 11:44:15 -0000 Mailing-List: contact commits-help@celix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@celix.apache.org Delivered-To: mailing list commits@celix.apache.org Received: (qmail 90952 invoked by uid 99); 2 Sep 2015 11:44:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Sep 2015 11:44:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0B182DFC90; Wed, 2 Sep 2015 11:44:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pnoltes@apache.org To: commits@celix.apache.org Date: Wed, 02 Sep 2015 11:44:16 -0000 Message-Id: <2a5457ad3e0c4f6e9f899ab43967f49d@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [02/11] celix git commit: CELIX-252: added separate hashmap to double-check whether expired key belongs to imported services CELIX-252: added separate hashmap to double-check whether expired key belongs to imported services Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/c396aeed Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/c396aeed Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/c396aeed Branch: refs/heads/feature/CELIX-237_rsa-ffi Commit: c396aeed17b7618d5b2109a7a83f0c4213ce70f3 Parents: f32a233 Author: Bjoern Petri Authored: Sun Aug 23 21:42:50 2015 +0200 Committer: Bjoern Petri Committed: Sun Aug 23 21:42:50 2015 +0200 ---------------------------------------------------------------------- .../discovery_etcd/private/include/etcd.h | 2 +- .../discovery_etcd/private/src/etcd.c | 135 ++++++++++--------- .../discovery_etcd/private/src/etcd_watcher.c | 69 ++++++++-- 3 files changed, 136 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/c396aeed/remote_services/discovery_etcd/private/include/etcd.h ---------------------------------------------------------------------- diff --git a/remote_services/discovery_etcd/private/include/etcd.h b/remote_services/discovery_etcd/private/include/etcd.h index e36fccb..f5624d0 100644 --- a/remote_services/discovery_etcd/private/include/etcd.h +++ b/remote_services/discovery_etcd/private/include/etcd.h @@ -54,6 +54,6 @@ bool etcd_get(char* key, char* value, char*action, int* modifiedIndex); bool etcd_getNodes(char* directory, char** nodeNames, int* size); bool etcd_set(char* key, char* value, int ttl, bool prevExist); bool etcd_del(char* key); -bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value); +bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value, char* rkey, int *modifiedIndex); #endif /* ETCD_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/c396aeed/remote_services/discovery_etcd/private/src/etcd.c ---------------------------------------------------------------------- diff --git a/remote_services/discovery_etcd/private/src/etcd.c b/remote_services/discovery_etcd/private/src/etcd.c index 2c74856..d38f6bd 100644 --- a/remote_services/discovery_etcd/private/src/etcd.c +++ b/remote_services/discovery_etcd/private/src/etcd.c @@ -304,66 +304,79 @@ bool etcd_del(char* key) { } ///watch -bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value) { - json_error_t error; - json_t* js_root = NULL; - json_t* js_node = NULL; - json_t* js_prevNode = NULL; - json_t* js_action = NULL; - json_t* js_value = NULL; - json_t* js_prevValue = NULL; - bool retVal = false; - char url[MAX_URL_LENGTH]; - int res; - struct MemoryStruct reply; - - reply.memory = malloc(1); /* will be grown as needed by the realloc above */ - reply.size = 0; /* no data at this point */ - if (index != 0) - snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&recursive=true&waitIndex=%d", etcd_server, etcd_port, key, - index); - else - snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&recursive=true", etcd_server, etcd_port, key); - - res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply); - - if (res == CURLE_OK) { - js_root = json_loads(reply.memory, 0, &error); - - if (js_root != NULL) { - js_action = json_object_get(js_root, ETCD_JSON_ACTION); - js_node = json_object_get(js_root, ETCD_JSON_NODE); - js_prevNode = json_object_get(js_root, ETCD_JSON_PREVNODE); - } - if (js_prevNode != NULL) { - js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE); - } - if (js_node != NULL) { - js_value = json_object_get(js_node, ETCD_JSON_VALUE); - } - if (js_prevNode != NULL) { - js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE); - } - if ((js_prevValue != NULL) && (json_is_string(js_prevValue))) { - strncpy(prevValue, json_string_value(js_prevValue), MAX_VALUE_LENGTH); - } - if ((js_value != NULL) && (json_is_string(js_value))) { - strncpy(value, json_string_value(js_value), MAX_VALUE_LENGTH); - } - if ((js_action != NULL) && (json_is_string(js_action))) { - strncpy(action, json_string_value(js_action), MAX_ACTION_LENGTH); - - retVal = true; - } - if (js_root != NULL) { - json_decref(js_root); - } - } - - if (reply.memory) { - free(reply.memory); - } - - return retVal; +bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value, char* rkey, int* modifiedIndex) { + json_error_t error; + json_t* js_root = NULL; + json_t* js_node = NULL; + json_t* js_prevNode = NULL; + json_t* js_action = NULL; + json_t* js_value = NULL; + json_t* js_rkey = NULL; + json_t* js_prevValue = NULL; + json_t* js_modIndex = NULL; + bool retVal = false; + char url[MAX_URL_LENGTH]; + int res; + struct MemoryStruct reply; + + reply.memory = malloc(1); /* will be grown as needed by the realloc above */ + reply.size = 0; /* no data at this point */ + + if (index != 0) + snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&recursive=true&waitIndex=%d", etcd_server, etcd_port, key, index); + else + snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&recursive=true", etcd_server, etcd_port, key); + + res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply); + + if (res == CURLE_OK) { + + js_root = json_loads(reply.memory, 0, &error); + + if (js_root != NULL) { + js_action = json_object_get(js_root, ETCD_JSON_ACTION); + js_node = json_object_get(js_root, ETCD_JSON_NODE); + js_prevNode = json_object_get(js_root, ETCD_JSON_PREVNODE); + } + if (js_prevNode != NULL) { + js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE); + } + if (js_node != NULL) { + js_rkey = json_object_get(js_node, ETCD_JSON_KEY); + js_value = json_object_get(js_node, ETCD_JSON_VALUE); + js_modIndex = json_object_get(js_node, ETCD_JSON_MODIFIEDINDEX); + } + if (js_prevNode != NULL) { + js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE); + } + if ((js_prevValue != NULL) && (json_is_string(js_prevValue))) { + strncpy(prevValue, json_string_value(js_prevValue), MAX_VALUE_LENGTH); + } + if ((js_value != NULL) && (json_is_string(js_value))) { + strncpy(value, json_string_value(js_value), MAX_VALUE_LENGTH); + } + if ((js_modIndex != NULL) && (json_is_integer(js_modIndex))) { + *modifiedIndex = json_integer_value(js_modIndex); + } else { + *modifiedIndex = index; + } + + if ((js_rkey != NULL) && (js_action != NULL) && (json_is_string(js_rkey)) && (json_is_string(js_action))) { + strncpy(rkey, json_string_value(js_rkey), MAX_KEY_LENGTH); + strncpy(action, json_string_value(js_action), MAX_ACTION_LENGTH); + + retVal = true; + } + if (js_root != NULL) { + json_decref(js_root); + } + + } + + if (reply.memory) { + free(reply.memory); + } + + return retVal; } http://git-wip-us.apache.org/repos/asf/celix/blob/c396aeed/remote_services/discovery_etcd/private/src/etcd_watcher.c ---------------------------------------------------------------------- diff --git a/remote_services/discovery_etcd/private/src/etcd_watcher.c b/remote_services/discovery_etcd/private/src/etcd_watcher.c index eefd28f..89be84e 100644 --- a/remote_services/discovery_etcd/private/src/etcd_watcher.c +++ b/remote_services/discovery_etcd/private/src/etcd_watcher.c @@ -31,6 +31,7 @@ #include "log_helper.h" #include "log_service.h" #include "constants.h" +#include "utils.h" #include "discovery.h" #include "discovery_impl.h" @@ -42,6 +43,7 @@ struct etcd_watcher { discovery_pt discovery; log_helper_pt* loghelper; + hash_map_pt entries; celix_thread_mutex_t watcherLock; celix_thread_t watcherThread; @@ -200,6 +202,52 @@ static celix_status_t etcdWatcher_addOwnFramework(etcd_watcher_pt watcher) return status; } + + + +static celix_status_t etcdWatcher_addEntry(etcd_watcher_pt watcher, char* key, char* value) { + celix_status_t status = CELIX_BUNDLE_EXCEPTION; + endpoint_discovery_poller_pt poller = watcher->discovery->poller; + + if (!hashMap_containsKey(watcher->entries, key)) { + status = endpointDiscoveryPoller_addDiscoveryEndpoint(poller, value); + + if (status == CELIX_SUCCESS) { + hashMap_put(watcher->entries, key, value); + } + } + + return status; +} + + +static celix_status_t etcdWatcher_removeEntry(etcd_watcher_pt watcher, char* key, char* value) { + celix_status_t status = CELIX_BUNDLE_EXCEPTION; + endpoint_discovery_poller_pt poller = watcher->discovery->poller; + + if (hashMap_containsKey(watcher->entries, key)) { + + hashMap_remove(watcher->entries, key); + + // check if there is another entry with the same value + hash_map_iterator_pt iter = hashMapIterator_create(watcher->entries); + unsigned int valueFound = 0; + + while (hashMapIterator_hasNext(iter) && valueFound <= 1) { + if (strcmp(value, hashMapIterator_nextValue(iter)) == 0) + valueFound++; + } + + if (valueFound == 0) + status = endpointDiscoveryPoller_removeDiscoveryEndpoint(poller, value); + + } + + return status; + +} + + /* * performs (blocking) etcd_watch calls to check for * changing discovery endpoint information within etcd. @@ -211,29 +259,32 @@ static void* etcdWatcher_run(void* data) { int highestModified = 0; bundle_context_pt context = watcher->discovery->context; - endpoint_discovery_poller_pt poller = watcher->discovery->poller; etcdWatcher_addAlreadyExistingWatchpoints(watcher->discovery, &highestModified); etcdWatcher_getRootPath(context, &rootPath[0]); while (watcher->running) { + + char rkey[MAX_KEY_LENGTH]; char value[MAX_VALUE_LENGTH]; char preValue[MAX_VALUE_LENGTH]; char action[MAX_ACTION_LENGTH]; + int modIndex; - if (etcd_watch(rootPath, highestModified+1, &action[0], &preValue[0], &value[0]) == true) { + if (etcd_watch(rootPath, highestModified + 1, &action[0], &preValue[0], &value[0], &rkey[0], &modIndex) == true) { if (strcmp(action, "set") == 0) { - endpointDiscoveryPoller_addDiscoveryEndpoint(poller, strdup(&value[0])); + etcdWatcher_addEntry(watcher, &rkey[0], &value[0]); } else if (strcmp(action, "delete") == 0) { - endpointDiscoveryPoller_removeDiscoveryEndpoint(poller, &preValue[0]); + etcdWatcher_removeEntry(watcher, &rkey[0], &value[0]); } else if (strcmp(action, "expire") == 0) { - endpointDiscoveryPoller_removeDiscoveryEndpoint(poller, &preValue[0]); + etcdWatcher_removeEntry(watcher, &rkey[0], &value[0]); } else if (strcmp(action, "update") == 0) { - // TODO + etcdWatcher_addEntry(watcher, &rkey[0], &value[0]); } else { logHelper_log(*watcher->loghelper, OSGI_LOGSERVICE_INFO, "Unexpected action: %s", action); } - highestModified++; + + highestModified = modIndex; } // update own framework uuid @@ -263,7 +314,6 @@ celix_status_t etcdWatcher_create(discovery_pt discovery, bundle_context_pt cont return CELIX_BUNDLE_EXCEPTION; } - (*watcher) = calloc(1, sizeof(struct etcd_watcher)); if (!*watcher) { return CELIX_ENOMEM; @@ -272,6 +322,7 @@ celix_status_t etcdWatcher_create(discovery_pt discovery, bundle_context_pt cont { (*watcher)->discovery = discovery; (*watcher)->loghelper = &discovery->loghelper; + (*watcher)->entries = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); } if ((bundleContext_getProperty(context, CFG_ETCD_SERVER_IP, &etcd_server) != CELIX_SUCCESS) || !etcd_server) { @@ -338,6 +389,8 @@ celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher) { watcher->loghelper = NULL; + hashMap_destroy(watcher->entries, true, true); + free(watcher); return status;