celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnol...@apache.org
Subject celix git commit: CELIX-395: discovery_etcd using etcdlib now
Date Fri, 27 Jan 2017 10:52:09 GMT
Repository: celix
Updated Branches:
  refs/heads/develop 4e665476c -> d62731a3c


CELIX-395: discovery_etcd using etcdlib now


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/d62731a3
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/d62731a3
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/d62731a3

Branch: refs/heads/develop
Commit: d62731a3cfe482d6796fea3fbd386091756f3b2a
Parents: 4e66547
Author: Roy Lenferink <lenferinkroy@gmail.com>
Authored: Fri Jan 27 10:27:48 2017 +0100
Committer: Roy Lenferink <lenferinkroy@gmail.com>
Committed: Fri Jan 27 11:35:28 2017 +0100

----------------------------------------------------------------------
 remote_services/discovery_etcd/CMakeLists.txt   |   4 +-
 .../private/include/discovery_impl.h            |   4 +-
 .../discovery_etcd/private/include/etcd.h       |  61 ---
 .../discovery_etcd/private/src/etcd.c           | 397 -------------------
 .../discovery_etcd/private/src/etcd_watcher.c   | 168 ++++----
 remote_services/examples/CMakeLists.txt         |   4 +-
 6 files changed, 83 insertions(+), 555 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/d62731a3/remote_services/discovery_etcd/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/CMakeLists.txt b/remote_services/discovery_etcd/CMakeLists.txt
index 31ba269..442d486 100644
--- a/remote_services/discovery_etcd/CMakeLists.txt
+++ b/remote_services/discovery_etcd/CMakeLists.txt
@@ -24,6 +24,7 @@ if (RSA_DISCOVERY_ETCD)
 	include_directories("${CURL_INCLUDE_DIR}")
 	include_directories("${JANSSON_INCLUDE_DIR}")
 	include_directories("${LIBXML2_INCLUDE_DIR}")
+	include_directories("${PROJECT_SOURCE_DIR}/etcdlib/public/include")
 	include_directories("${PROJECT_SOURCE_DIR}/utils/public/include")
 	include_directories("${PROJECT_SOURCE_DIR}/remote_services/utils/private/include")
 	include_directories("${PROJECT_SOURCE_DIR}/remote_services/utils/public/include")
@@ -40,7 +41,6 @@ if (RSA_DISCOVERY_ETCD)
         NAME "Apache Celix RSA Discovery ETCD"
         SOURCES
 		private/src/discovery_impl.c
-		private/src/etcd.c
 	    private/src/etcd_watcher.c
 		${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/discovery_activator.c
 		${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/discovery.c
@@ -56,6 +56,6 @@ if (RSA_DISCOVERY_ETCD)
 
 	install_bundle(discovery_etcd)
 		
-	target_link_libraries(discovery_etcd celix_framework ${CURL_LIBRARIES} ${LIBXML2_LIBRARIES}
${JANSSON_LIBRARIES})
+	target_link_libraries(discovery_etcd celix_framework etcdlib ${CURL_LIBRARIES} ${LIBXML2_LIBRARIES}
${JANSSON_LIBRARIES})
 
 endif (RSA_DISCOVERY_ETCD)

http://git-wip-us.apache.org/repos/asf/celix/blob/d62731a3/remote_services/discovery_etcd/private/include/discovery_impl.h
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/include/discovery_impl.h b/remote_services/discovery_etcd/private/include/discovery_impl.h
index e7e1071..a19b145 100644
--- a/remote_services/discovery_etcd/private/include/discovery_impl.h
+++ b/remote_services/discovery_etcd/private/include/discovery_impl.h
@@ -45,9 +45,7 @@
 
 #define DEFAULT_POLL_ENDPOINTS ""
 
-#define MAX_ROOTNODE_LENGTH		 64
-#define MAX_LOCALNODE_LENGTH	256
-
+#define FREE_MEM(ptr) if(ptr) {free(ptr); ptr = NULL;}
 
 struct discovery {
 	bundle_context_pt context;

http://git-wip-us.apache.org/repos/asf/celix/blob/d62731a3/remote_services/discovery_etcd/private/include/etcd.h
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/include/etcd.h b/remote_services/discovery_etcd/private/include/etcd.h
deleted file mode 100644
index 2ba09de..0000000
--- a/remote_services/discovery_etcd/private/include/etcd.h
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0 
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/*
- * etcd.h
- *
- *  \date       26 Jul 2014
- *  \author     <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright  Apache License, Version 2.0
- */
-
-
-#ifndef ETCD_H_
-#define ETCD_H_
-
-#include <stdbool.h>
-#include <celix_errno.h>
-
-#define MAX_NODES			256
-
-#define MAX_KEY_LENGTH		256
-#define MAX_VALUE_LENGTH	256
-#define MAX_ACTION_LENGTH	64
-
-#define MAX_URL_LENGTH		256
-#define MAX_CONTENT_LENGTH	1024
-
-#define ETCD_JSON_NODE			"node"
-#define ETCD_JSON_PREVNODE		"prevNode"
-#define ETCD_JSON_NODES			"nodes"
-#define ETCD_JSON_ACTION		"action"
-#define ETCD_JSON_KEY			"key"
-#define ETCD_JSON_VALUE			"value"
-#define ETCD_JSON_MODIFIEDINDEX	"modifiedIndex"
-#define ETCD_ERROR_INDICATION   "errorCode"
-#define ETCD_INDEX				"index"
-
-celix_status_t etcd_init(char* server, int port);
-bool etcd_get(char* key, char* value, char*action, int* modifiedIndex);
-bool etcd_getNodes(char* directory, char** nodeNames, int* size);
-bool etcd_set(char* key, char* value, int ttl, bool prevExist);
-bool etcd_del(char* key);
-bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value, char* rkey,
int *modifiedIndex, int *error);
-
-#endif /* ETCD_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/d62731a3/remote_services/discovery_etcd/private/src/etcd.c
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/src/etcd.c b/remote_services/discovery_etcd/private/src/etcd.c
deleted file mode 100644
index 1b74f66..0000000
--- a/remote_services/discovery_etcd/private/src/etcd.c
+++ /dev/null
@@ -1,397 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0 
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/*
- * etcd.c
- *
- *  \date       26 Jul 2014
- *  \author     <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright  Apache License, Version 2.0
- */
-
-
-#include <stdio.h>
-#include <stdbool.h>
-#include <string.h>
-
-#include <curl/curl.h>
-#include <jansson.h>
-
-#include "etcd.h"
-
-#define DEFAULT_CURL_TIMEOUT          10
-#define DEFAULT_CURL_CONECTTIMEOUT    10
-
-typedef enum {
-	GET, PUT, DELETE
-} request_t;
-
-static char* etcd_server = NULL;
-static int etcd_port = 0;
-
-struct MemoryStruct {
-	char *memory;
-	size_t size;
-};
-
-static size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp)
{
-	size_t realsize = size * nmemb;
-	struct MemoryStruct *mem = (struct MemoryStruct *) userp;
-
-	mem->memory = realloc(mem->memory, mem->size + realsize + 1);
-	if (mem->memory == NULL) {
-		/* out of memory! */
-		printf("not enough memory (realloc returned NULL)\n");
-		return 0;
-	}
-
-	memcpy(&(mem->memory[mem->size]), contents, realsize);
-	mem->size += realsize;
-	mem->memory[mem->size] = 0;
-
-	return realsize;
-}
-
-static int performRequest(char* url, request_t request, void* callback, void* reqData, void*
repData) {
-	CURL *curl = NULL;
-	CURLcode res = 0;
-
-	curl = curl_easy_init();
-	curl_easy_setopt(curl, CURLOPT_TIMEOUT, DEFAULT_CURL_TIMEOUT);
-	curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, DEFAULT_CURL_CONECTTIMEOUT);
-	curl_easy_setopt(curl, CURLOPT_URL, url);
-	curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
-	curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, callback);
-	curl_easy_setopt(curl, CURLOPT_WRITEDATA, repData);
-	curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
-
-	if (request == PUT) {
-		curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
-		curl_easy_setopt(curl, CURLOPT_POST, 1L);
-//		curl_easy_setopt(curl, CURLOPT_HTTPHEADER, "Content-type: application/json");
-		curl_easy_setopt(curl, CURLOPT_POSTFIELDS, reqData);
-	} else if (request == DELETE) {
-		curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
-	} else if (request == GET) {
-		curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "GET");
-	}
-
-	res = curl_easy_perform(curl);
-	curl_easy_cleanup(curl);
-
-	return res;
-}
-
-// open
-celix_status_t etcd_init(char* server, int port) {
-	celix_status_t status = CELIX_SUCCESS;
-
-	etcd_server = server;
-	etcd_port = port;
-
-	return status;
-}
-
-// get
-bool etcd_get(char* key, char* value, char* action, int* modifiedIndex) {
-	json_t* js_root = NULL;
-	json_t* js_node = NULL;
-	json_t* js_value = NULL;
-	json_t* js_modifiedIndex = NULL;
-	json_error_t error;
-	int res;
-	struct MemoryStruct reply;
-
-	reply.memory = malloc(1); /* will be grown as needed by the realloc above */
-	reply.size = 0; /* no data at this point */
-
-	bool retVal = false;
-	char url[MAX_URL_LENGTH];
-	snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key);
-
-	res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
-
-	if (res == CURLE_OK) {
-		js_root = json_loads(reply.memory, 0, &error);
-
-		if (js_root != NULL) {
-			js_node = json_object_get(js_root, ETCD_JSON_NODE);
-		}
-		if (js_node != NULL) {
-			js_value = json_object_get(js_node, ETCD_JSON_VALUE);
-			js_modifiedIndex = json_object_get(js_node, ETCD_JSON_MODIFIEDINDEX);
-
-			if (js_modifiedIndex != NULL) {
-				*modifiedIndex = json_integer_value(js_modifiedIndex);
-			}
-
-			if (js_value != NULL) {
-				snprintf(value, MAX_VALUE_LENGTH, "%s", json_string_value(js_value));
-				retVal = true;
-			}
-		}
-		if (js_root != NULL) {
-			json_decref(js_root);
-		}
-	}
-
-	if (reply.memory) {
-		free(reply.memory);
-	}
-
-
-	return retVal;
-}
-
-// getNodes
-bool etcd_getNodes(char* directory, char** nodeNames, int* size) {
-	json_t* js_root = NULL;
-	json_t* js_node = NULL;
-	json_t* js_nodes = NULL;
-	json_error_t error;
-	int res;
-	struct MemoryStruct reply;
-
-	reply.memory = malloc(1); /* will be grown as needed by the realloc above */
-	reply.size = 0; /* no data at this point */
-
-	bool retVal = false;
-	char url[MAX_URL_LENGTH];
-	snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, directory);
-
-	res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
-
-	if (res == CURLE_OK) {
-		js_root = json_loads(reply.memory, 0, &error);
-
-		if (js_root != NULL) {
-			js_node = json_object_get(js_root, ETCD_JSON_NODE);
-		}
-		if (js_node != NULL) {
-			js_nodes = json_object_get(js_node, ETCD_JSON_NODES);
-		}
-
-		if (js_nodes != NULL && json_is_array(js_nodes)) {
-			int i = 0;
-			retVal = true;
-
-			for (i = 0; i < json_array_size(js_nodes) && i < MAX_NODES; i++) {
-				json_t* js_node = json_array_get(js_nodes, i);
-
-				if (!json_is_object(js_node)) {
-					retVal = false;
-				} else {
-					json_t* js_key = json_object_get(js_node, ETCD_JSON_KEY);
-					snprintf(nodeNames[i], MAX_KEY_LENGTH, "%s", json_string_value(js_key));
-				}
-			}
-			*size = i;
-		}
-		if (js_root != NULL) {
-			json_decref(js_root);
-		}
-	}
-
-	if (reply.memory) {
-		free(reply.memory);
-	}
-
-	return retVal;
-}
-
-
-
-bool etcd_set(char* key, char* value, int ttl, bool prevExist) {
-	json_error_t error;
-	json_t* js_root = NULL;
-	json_t* js_node = NULL;
-	json_t* js_value = NULL;
-	bool retVal = false;
-	char url[MAX_URL_LENGTH];
-	char request[MAX_CONTENT_LENGTH];
-	char* cur = request;
-	int res;
-	struct MemoryStruct reply;
-
-	reply.memory = malloc(1); /* will be grown as needed by the realloc above */
-	reply.size = 0; /* no data at this point */
-
-	snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key);
-	cur += snprintf(cur, MAX_CONTENT_LENGTH, "value=%s", value);
-
-	if (ttl > 0)
-	    cur += snprintf(cur, MAX_CONTENT_LENGTH, ";ttl=%d", ttl);
-
-	if (prevExist)
-	    cur += snprintf(cur, MAX_CONTENT_LENGTH, ";prevExist=true");
-
-	res = performRequest(url, PUT, WriteMemoryCallback, request, (void*) &reply);
-
-	if (res == CURLE_OK) {
-		js_root = json_loads(reply.memory, 0, &error);
-
-		if (js_root != NULL) {
-			js_node = json_object_get(js_root, ETCD_JSON_NODE);
-		}
-		if (js_node != NULL) {
-			js_value = json_object_get(js_node, ETCD_JSON_VALUE);
-		}
-		if (js_value != NULL && json_is_string(js_value)) {
-			retVal = (strcmp(json_string_value(js_value), value) == 0);
-		}
-		if (js_root != NULL) {
-			json_decref(js_root);
-		}
-	}
-
-	if (reply.memory) {
-		free(reply.memory);
-	}
-
-	return retVal;
-}
-
-
-
-//delete
-bool etcd_del(char* key) {
-	json_error_t error;
-	json_t* js_root = NULL;
-	json_t* js_node = NULL;
-	bool retVal = false;
-	char url[MAX_URL_LENGTH];
-	char request[MAX_CONTENT_LENGTH];
-	int res;
-	struct MemoryStruct reply;
-
-	reply.memory = malloc(1); /* will be grown as needed by the realloc above */
-	reply.size = 0; /* no data at this point */
-
-	snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key);
-	res = performRequest(url, DELETE, WriteMemoryCallback, request, (void*) &reply);
-
-	if (res == CURLE_OK) {
-		js_root = json_loads(reply.memory, 0, &error);
-
-		if (js_root != NULL) {
-			js_node = json_object_get(js_root, ETCD_JSON_NODE);
-		}
-
-		retVal = (js_node != NULL);
-
-		if (js_root != NULL) {
-			json_decref(js_root);
-		}
-	}
-
-	if (reply.memory) {
-		free(reply.memory);
-	}
-
-
-	return retVal;
-}
-
-///watch
-
-bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value, char* rkey,
int* modifiedIndex, int* errorCode) {
-    json_error_t error;
-    json_t* js_root = NULL;
-    json_t* js_node = NULL;
-    json_t* js_prevNode = NULL;
-    json_t* js_action = NULL;
-    json_t* js_value = NULL;
-    json_t* js_rkey = NULL;
-    json_t* js_prevValue = NULL;
-    json_t* js_modIndex = NULL;
-    json_t* js_error = NULL;	// used to indicate valid json response with ETCD error indication
-    bool retVal = false;
-    *errorCode = 0;
-    char url[MAX_URL_LENGTH];
-    int res;
-    struct MemoryStruct reply;
-
-    reply.memory = malloc(1); /* will be grown as needed by the realloc above */
-    reply.size = 0; /* no data at this point */
-
-    if (index != 0)
-        snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&recursive=true&waitIndex=%d",
etcd_server, etcd_port, key, index);
-    else
-        snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&recursive=true",
etcd_server, etcd_port, key);
-
-    res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
-
-    if (res == CURLE_OK) {
-
-        js_root = json_loads(reply.memory, 0, &error);
-
-        if (js_root != NULL) {
-            js_action = json_object_get(js_root, ETCD_JSON_ACTION);
-            js_node = json_object_get(js_root, ETCD_JSON_NODE);
-            js_prevNode = json_object_get(js_root, ETCD_JSON_PREVNODE);
-            js_error = json_object_get(js_root, ETCD_ERROR_INDICATION);
-        }
-        if (js_prevNode != NULL) {
-            js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
-        }
-        if (js_node != NULL) {
-            js_rkey = json_object_get(js_node, ETCD_JSON_KEY);
-            js_value = json_object_get(js_node, ETCD_JSON_VALUE);
-            js_modIndex = json_object_get(js_node, ETCD_JSON_MODIFIEDINDEX);
-        }
-        if (js_prevNode != NULL) {
-            js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
-        }
-        if ((js_prevValue != NULL) && (json_is_string(js_prevValue))) {
-            strncpy(prevValue, json_string_value(js_prevValue), MAX_VALUE_LENGTH);
-        }
-        if ((js_value != NULL) && (json_is_string(js_value))) {
-            strncpy(value, json_string_value(js_value), MAX_VALUE_LENGTH);
-        }
-        if ((js_modIndex != NULL) && (json_is_integer(js_modIndex))) {
-            *modifiedIndex = json_integer_value(js_modIndex);
-        } else {
-            *modifiedIndex = index;
-        }
-
-        if ((js_rkey != NULL) && (js_action != NULL) && (json_is_string(js_rkey))
&& (json_is_string(js_action))) {
-            strncpy(rkey, json_string_value(js_rkey), MAX_KEY_LENGTH);
-            strncpy(action, json_string_value(js_action), MAX_ACTION_LENGTH);
-
-            retVal = true;
-        }
-        if ((js_error != NULL) && (json_is_integer(js_error))) {
-		*errorCode = json_integer_value(js_error);
-		js_modIndex = json_object_get(js_root, ETCD_INDEX);
-		if ((js_modIndex != NULL) && (json_is_integer(js_modIndex))) {
-		    *modifiedIndex = json_integer_value(js_modIndex);
-		}
-        }
-        if (js_root != NULL) {
-            json_decref(js_root);
-        }
-
-    }
-
-    if (reply.memory) {
-        free(reply.memory);
-    }
-
-    return retVal;
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/d62731a3/remote_services/discovery_etcd/private/src/etcd_watcher.c
----------------------------------------------------------------------
diff --git a/remote_services/discovery_etcd/private/src/etcd_watcher.c b/remote_services/discovery_etcd/private/src/etcd_watcher.c
index 7e1ce33..a09002a 100644
--- a/remote_services/discovery_etcd/private/src/etcd_watcher.c
+++ b/remote_services/discovery_etcd/private/src/etcd_watcher.c
@@ -26,6 +26,7 @@
 
 #include <stdbool.h>
 #include <stdlib.h>
+#include <unistd.h>
 #include <string.h>
 
 #include "log_helper.h"
@@ -35,6 +36,7 @@
 #include "discovery.h"
 #include "discovery_impl.h"
 
+#include <curl/curl.h>
 #include "etcd.h"
 #include "etcd_watcher.h"
 
@@ -51,18 +53,23 @@ struct etcd_watcher {
 	volatile bool running;
 };
 
-#define CFG_ETCD_ROOT_PATH		"DISCOVERY_ETCD_ROOT_PATH"
-#define DEFAULT_ETCD_ROOTPATH	"discovery"
 
-#define CFG_ETCD_SERVER_IP		"DISCOVERY_ETCD_SERVER_IP"
-#define DEFAULT_ETCD_SERVER_IP	"127.0.0.1"
+#define MAX_ROOTNODE_LENGTH			128
+#define MAX_LOCALNODE_LENGTH		4096
+#define MAX_VALUE_LENGTH			256
 
-#define CFG_ETCD_SERVER_PORT	"DISCOVERY_ETCD_SERVER_PORT"
-#define DEFAULT_ETCD_SERVER_PORT 2379
+#define CFG_ETCD_ROOT_PATH			"DISCOVERY_ETCD_ROOT_PATH"
+#define DEFAULT_ETCD_ROOTPATH		"discovery"
+
+#define CFG_ETCD_SERVER_IP			"DISCOVERY_ETCD_SERVER_IP"
+#define DEFAULT_ETCD_SERVER_IP		"127.0.0.1"
+
+#define CFG_ETCD_SERVER_PORT		"DISCOVERY_ETCD_SERVER_PORT"
+#define DEFAULT_ETCD_SERVER_PORT 	2379
 
 // be careful - this should be higher than the curl timeout
-#define CFG_ETCD_TTL   "DISCOVERY_ETCD_TTL"
-#define DEFAULT_ETCD_TTL 30
+#define CFG_ETCD_TTL   				"DISCOVERY_ETCD_TTL"
+#define DEFAULT_ETCD_TTL 			30
 
 
 // note that the rootNode shouldn't have a leading slash
@@ -71,36 +78,41 @@ static celix_status_t etcdWatcher_getRootPath(bundle_context_pt context,
char* r
 	const char* rootPath = NULL;
 
 	if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS)
|| (!rootPath)) {
-		strcpy(rootNode, DEFAULT_ETCD_ROOTPATH);
+		strncpy(rootNode, DEFAULT_ETCD_ROOTPATH, MAX_ROOTNODE_LENGTH);
 	}
 	else {
-		strcpy(rootNode, rootPath);
+		strncpy(rootNode, rootPath, MAX_ROOTNODE_LENGTH);
 	}
 
 	return status;
 }
 
-
 static celix_status_t etcdWatcher_getLocalNodePath(bundle_context_pt context, char* localNodePath)
{
 	celix_status_t status = CELIX_SUCCESS;
 	char rootPath[MAX_ROOTNODE_LENGTH];
     const char* uuid = NULL;
 
-    if ((etcdWatcher_getRootPath(context, &rootPath[0]) != CELIX_SUCCESS)) {
+    if ((etcdWatcher_getRootPath(context, rootPath) != CELIX_SUCCESS)) {
 		status = CELIX_ILLEGAL_STATE;
     }
 	else if (((bundleContext_getProperty(context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &uuid))
!= CELIX_SUCCESS) || (!uuid)) {
 		status = CELIX_ILLEGAL_STATE;
 	}
-	else if (rootPath[strlen(&rootPath[0]) - 1] == '/') {
-    	snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s%s", &rootPath[0], uuid);
+	else if (rootPath[strlen(rootPath) - 1] == '/') {
+    	snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s%s", rootPath, uuid);
     }
     else {
-    	snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s/%s", &rootPath[0], uuid);
+    	snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s/%s", rootPath, uuid);
     }
 
     return status;
 }
+
+static void add_node(const char *key, const char *value, void* arg) {
+	discovery_pt discovery = (discovery_pt) arg;
+	endpointDiscoveryPoller_addDiscoveryEndpoint(discovery->poller, (char *) value);
+}
+
 /*
  * retrieves all already existing discovery endpoints
  * from etcd and adds them to the poller.
@@ -108,44 +120,18 @@ static celix_status_t etcdWatcher_getLocalNodePath(bundle_context_pt
context, ch
  * returns the modifiedIndex of the last modified
  * discovery endpoint (see etcd documentation).
  */
-static celix_status_t etcdWatcher_addAlreadyExistingWatchpoints(discovery_pt discovery, int*
highestModified) {
+static celix_status_t etcdWatcher_addAlreadyExistingWatchpoints(discovery_pt discovery, long
long* highestModified) {
 	celix_status_t status = CELIX_SUCCESS;
-	char** nodeArr = calloc(MAX_NODES, sizeof(*nodeArr));
-	char rootPath[MAX_ROOTNODE_LENGTH];
-	int i, size;
-
-	*highestModified = -1;
 
-	for (i = 0; i < MAX_NODES; i++) {
-		nodeArr[i] = calloc(MAX_KEY_LENGTH, sizeof(*nodeArr[i]));
-	}
+	char rootPath[MAX_ROOTNODE_LENGTH];
+	status = etcdWatcher_getRootPath(discovery->context, rootPath);
 
-	// we need to go though all nodes and get the highest modifiedIndex
-	if (((status = etcdWatcher_getRootPath(discovery->context, &rootPath[0])) == CELIX_SUCCESS)
&&
-		 (etcd_getNodes(rootPath, nodeArr, &size) == true)) {
-		for (i = 0; i < size; i++) {
-			char* key = nodeArr[i];
-			char value[MAX_VALUE_LENGTH];
-			char action[MAX_VALUE_LENGTH];
-			int modIndex;
-
-			if (etcd_get(key, &value[0], &action[0], &modIndex) == true) {
-				// TODO: check that this is not equals to the local endpoint
-				endpointDiscoveryPoller_addDiscoveryEndpoint(discovery->poller, &value[0]);
-
-				if (modIndex > *highestModified) {
-					*highestModified = modIndex;
-				}
-			}
+	if (status == CELIX_SUCCESS) {
+		if(etcd_get_directory(rootPath, add_node, discovery, highestModified)) {
+			    status = CELIX_ILLEGAL_ARGUMENT;
 		}
 	}
 
-	for (i = 0; i < MAX_NODES; i++) {
-		free(nodeArr[i]);
-	}
-
-	free(nodeArr);
-
 	return status;
 }
 
@@ -154,8 +140,7 @@ static celix_status_t etcdWatcher_addOwnFramework(etcd_watcher_pt watcher)
 {
     celix_status_t status = CELIX_BUNDLE_EXCEPTION;
     char localNodePath[MAX_LOCALNODE_LENGTH];
-    char value[MAX_VALUE_LENGTH];
-    char action[MAX_VALUE_LENGTH];
+    char *value;
  	char url[MAX_VALUE_LENGTH];
     int modIndex;
     char* endpoints = NULL;
@@ -166,30 +151,30 @@ static celix_status_t etcdWatcher_addOwnFramework(etcd_watcher_pt watcher)
 	endpoint_discovery_server_pt server = watcher->discovery->server;
 
     // register own framework
-    if ((status = etcdWatcher_getLocalNodePath(context, &localNodePath[0])) != CELIX_SUCCESS)
{
+    if ((status = etcdWatcher_getLocalNodePath(context, localNodePath)) != CELIX_SUCCESS)
{
         return status;
     }
 
-	if (endpointDiscoveryServer_getUrl(server, &url[0]) != CELIX_SUCCESS) {
+	if (endpointDiscoveryServer_getUrl(server, url) != CELIX_SUCCESS) {
 		snprintf(url, MAX_VALUE_LENGTH, "http://%s:%s/%s", DEFAULT_SERVER_IP, DEFAULT_SERVER_PORT,
DEFAULT_SERVER_PATH);
 	}
 
-	endpoints = &url[0];
+	endpoints = url;
 
     if ((bundleContext_getProperty(context, CFG_ETCD_TTL, &ttlStr) != CELIX_SUCCESS)
|| !ttlStr) {
         ttl = DEFAULT_ETCD_TTL;
     }
     else
     {
-        char* endptr = (char*)ttlStr;
+        char* endptr = (char *) ttlStr;
         errno = 0;
-        ttl =  strtol(ttlStr, &endptr, 10);
+        ttl = strtol(ttlStr, &endptr, 10);
         if (*endptr || errno != 0) {
             ttl = DEFAULT_ETCD_TTL;
         }
     }
 
-	if (etcd_get(localNodePath, &value[0], &action[0], &modIndex) != true) {
+	if (etcd_get(localNodePath, &value, &modIndex) != true) {
 		etcd_set(localNodePath, endpoints, ttl, false);
 	}
 	else if (etcd_set(localNodePath, endpoints, ttl, true) == false)  {
@@ -199,6 +184,8 @@ static celix_status_t etcdWatcher_addOwnFramework(etcd_watcher_pt watcher)
         status = CELIX_SUCCESS;
     }
 
+	FREE_MEM(value);
+
     return status;
 }
 
@@ -262,52 +249,47 @@ static celix_status_t etcdWatcher_removeEntry(etcd_watcher_pt watcher,
char* key
 static void* etcdWatcher_run(void* data) {
 	etcd_watcher_pt watcher = (etcd_watcher_pt) data;
 	time_t timeBeforeWatch = time(NULL);
-	static char rootPath[MAX_ROOTNODE_LENGTH];
-	int highestModified = 0;
-	int errorCode=0;
+	char rootPath[MAX_ROOTNODE_LENGTH];
+	long long highestModified = 0;
 
 	bundle_context_pt context = watcher->discovery->context;
 
 	etcdWatcher_addAlreadyExistingWatchpoints(watcher->discovery, &highestModified);
-	etcdWatcher_getRootPath(context, &rootPath[0]);
+	etcdWatcher_getRootPath(context, rootPath);
 
 	while (watcher->running) {
 
-        char rkey[MAX_KEY_LENGTH];
-		char value[MAX_VALUE_LENGTH];
-		char preValue[MAX_VALUE_LENGTH];
-		char action[MAX_ACTION_LENGTH];
-        int modIndex=0;
-
-        if (etcd_watch(rootPath, highestModified + 1, &action[0], &preValue[0], &value[0],
&rkey[0], &modIndex, &errorCode) == true) {
-		if (strcmp(action, "set") == 0) {
-			etcdWatcher_addEntry(watcher, &rkey[0], &value[0]);
-		} else if (strcmp(action, "delete") == 0) {
-			etcdWatcher_removeEntry(watcher, &rkey[0], &value[0]);
-		} else if (strcmp(action, "expire") == 0) {
-			etcdWatcher_removeEntry(watcher, &rkey[0], &value[0]);
-		} else if (strcmp(action, "update") == 0) {
-			etcdWatcher_addEntry(watcher, &rkey[0], &value[0]);
-		} else {
-			logHelper_log(*watcher->loghelper, OSGI_LOGSERVICE_INFO, "Unexpected action: %s", action);
-		}
+		char *rkey = NULL;
+		char *value = NULL;
+		char *preValue = NULL;
+		char *action = NULL;
+		long long modIndex;
+
+        if (etcd_watch(rootPath, highestModified + 1, &action, &preValue, &value,
&rkey, &modIndex) == 0 && action != NULL) {
+			if (strcmp(action, "set") == 0) {
+				etcdWatcher_addEntry(watcher, rkey, value);
+			} else if (strcmp(action, "delete") == 0) {
+				etcdWatcher_removeEntry(watcher, rkey, value);
+			} else if (strcmp(action, "expire") == 0) {
+				etcdWatcher_removeEntry(watcher, rkey, value);
+			} else if (strcmp(action, "update") == 0) {
+				etcdWatcher_addEntry(watcher, rkey, value);
+			} else {
+				logHelper_log(*watcher->loghelper, OSGI_LOGSERVICE_INFO, "Unexpected action: %s",
action);
+			}
 
-		highestModified = modIndex;
-        }
-        /* prevent busy waiting, in case etcd_watch returns false */
-        else {
-		switch (errorCode) {
-		case 401:
-			// Etcd can store at most 1000 events
 			highestModified = modIndex;
-			break;
-		default:
-			break;
-		}
+        } else if (time(NULL) - timeBeforeWatch <= (DEFAULT_ETCD_TTL / 4)) {
+			sleep(DEFAULT_ETCD_TTL / 4);
         }
 
+        FREE_MEM(action);
+        FREE_MEM(value);
+        FREE_MEM(preValue);
+        FREE_MEM(rkey);
+
 		// update own framework uuid
-		if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL/2)) {
+		if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL / 4)) {
 			etcdWatcher_addOwnFramework(watcher);
 			timeBeforeWatch = time(NULL);
 		}
@@ -361,7 +343,11 @@ celix_status_t etcdWatcher_create(discovery_pt discovery, bundle_context_pt
cont
 		}
 	}
 
-    status = etcd_init((char*)etcd_server, etcd_port);
+	if (etcd_init((char*) etcd_server, etcd_port, CURL_GLOBAL_DEFAULT) != 0) {
+		status = CELIX_BUNDLE_EXCEPTION;
+	} else {
+		status = CELIX_SUCCESS;
+	}
 
     if (status == CELIX_SUCCESS) {
         etcdWatcher_addOwnFramework(*watcher);
@@ -391,7 +377,7 @@ celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher) {
 	celixThread_join(watcher->watcherThread, NULL);
 
 	// register own framework
-	status = etcdWatcher_getLocalNodePath(watcher->discovery->context, &localNodePath[0]);
+	status = etcdWatcher_getLocalNodePath(watcher->discovery->context, localNodePath);
 
 	if (status != CELIX_SUCCESS || etcd_del(localNodePath) == false)
 	{

http://git-wip-us.apache.org/repos/asf/celix/blob/d62731a3/remote_services/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/remote_services/examples/CMakeLists.txt b/remote_services/examples/CMakeLists.txt
index f24fa6f..44b7733 100644
--- a/remote_services/examples/CMakeLists.txt
+++ b/remote_services/examples/CMakeLists.txt
@@ -83,7 +83,9 @@ if (RSA_EXAMPLES)
             BUNDLES discovery_etcd topology_manager remote_service_admin_http calculator
shell shell_tui log_service log_writer
         )
         deploy_bundles_dir(remote-services-etcd DIR_NAME "endpoints"
-            BUNDLES org.apache.celix.calc.api.Calculator_endpoint
+            BUNDLES
+            	org.apache.celix.calc.api.Calculator_endpoint
+            	org.apache.celix.calc.api.Calculator2_endpoint
         )
 
         add_deploy("remote-services-etcd-client" 


Mime
View raw message