celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bpe...@apache.org
Subject svn commit: r1632567 - in /celix/trunk/remote_services: discovery/private/include/ discovery/private/src/ discovery_configured/private/include/ discovery_etcd/private/include/ discovery_etcd/private/src/
Date Fri, 17 Oct 2014 13:05:20 GMT
Author: bpetri
Date: Fri Oct 17 13:05:20 2014
New Revision: 1632567

URL: http://svn.apache.org/r1632567
Log:
CELIX-169: Add port-collision auto-correction to discovery

Modified:
    celix/trunk/remote_services/discovery/private/include/discovery.h
    celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h
    celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c
    celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h
    celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h
    celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h
    celix/trunk/remote_services/discovery_etcd/private/src/etcd.c
    celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c

Modified: celix/trunk/remote_services/discovery/private/include/discovery.h
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/include/discovery.h?rev=1632567&r1=1632566&r2=1632567&view=diff
==============================================================================
--- celix/trunk/remote_services/discovery/private/include/discovery.h (original)
+++ celix/trunk/remote_services/discovery/private/include/discovery.h Fri Oct 17 13:05:20
2014
@@ -33,6 +33,8 @@
 #include "endpoint_description.h"
 #include "endpoint_listener.h"
 
+#define DISCOVERY_SERVER_INTERFACE	"DISCOVERY_CFG_SERVER_INTERFACE"
+#define DISCOVERY_SERVER_IP 		"DISCOVERY_CFG_SERVER_IP"
 #define DISCOVERY_SERVER_PORT 		"DISCOVERY_CFG_SERVER_PORT"
 #define DISCOVERY_SERVER_PATH 		"DISCOVERY_CFG_SERVER_PATH"
 #define DISCOVERY_POLL_ENDPOINTS 	"DISCOVERY_CFG_POLL_ENDPOINTS"

Modified: celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h?rev=1632567&r1=1632566&r2=1632567&view=diff
==============================================================================
--- celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h (original)
+++ celix/trunk/remote_services/discovery/private/include/endpoint_discovery_server.h Fri
Oct 17 13:05:20 2014
@@ -68,4 +68,14 @@ celix_status_t endpointDiscoveryServer_a
  */
 celix_status_t endpointDiscoveryServer_removeEndpoint( endpoint_discovery_server_pt server,
endpoint_description_pt endpoint);
 
+/**
+ * Removes the url, which is used by the discovery server to announce the endpoints
+ *
+ * @param server [in] the endpoint discovery server to retrieve the url from
+ * @param url [out] url which is used to announce the endpoints.
+ * @return CELIX_SUCCESS when successful.
+ */
+celix_status_t endpointDiscoveryServer_getUrl(endpoint_discovery_server_pt server, char*
url);
+
+
 #endif /* ENDPOINT_DISCOVERY_SERVER_H_ */

Modified: celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c?rev=1632567&r1=1632566&r2=1632567&view=diff
==============================================================================
--- celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c (original)
+++ celix/trunk/remote_services/discovery/private/src/endpoint_discovery_server.c Fri Oct
17 13:05:20 2014
@@ -25,7 +25,10 @@
  */
 #include <stdlib.h>
 #include <stdint.h>
-
+#include <arpa/inet.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <ifaddrs.h>
 #include "civetweb.h"
 #include "celix_errno.h"
 #include "utils.h"
@@ -36,7 +39,8 @@
 #include "endpoint_descriptor_writer.h"
 #include "endpoint_discovery_server.h"
 
-
+// defines how often the webserver is restarted (with an increased port number)
+#define MAX_NUMBER_OF_RESTARTS 	5
 #define DEFAULT_SERVER_THREADS "1"
 
 #define CIVETWEB_REQUEST_NOT_HANDLED 0
@@ -54,16 +58,23 @@ struct endpoint_discovery_server {
     celix_thread_mutex_t serverLock;
 
     const char* path;
+    const char *port;
+    const char* ip;
     struct mg_context* ctx;
 };
 
 // Forward declarations...
 static int endpointDiscoveryServer_callback(struct mg_connection *conn);
 static char* format_path(char* path);
+static celix_status_t endpointDiscoveryServer_getIpAdress(char* interface, char** ip);
 
 celix_status_t endpointDiscoveryServer_create(discovery_pt discovery, bundle_context_pt context,
endpoint_discovery_server_pt *server) {
 	celix_status_t status = CELIX_SUCCESS;
 
+	char *port = 0;
+	char *ip = NULL;
+	char *path = NULL;
+
 	*server = malloc(sizeof(struct endpoint_discovery_server));
 	if (!*server) {
 		return CELIX_ENOMEM;
@@ -79,13 +90,34 @@ celix_status_t endpointDiscoveryServer_c
 		return CELIX_BUNDLE_EXCEPTION;
 	}
 
-	char *port = NULL;
+	bundleContext_getProperty(context, DISCOVERY_SERVER_IP, &ip);
+	if (ip == NULL) {
+		char *interface = NULL;
+
+		bundleContext_getProperty(context, DISCOVERY_SERVER_INTERFACE, &interface);
+		if ((interface != NULL) && (endpointDiscoveryServer_getIpAdress(interface, &ip)
!= CELIX_SUCCESS)) {
+			fw_log(logger, OSGI_FRAMEWORK_LOG_WARNING, "Could not retrieve IP adress for interface
%s", interface);
+		}
+
+		if (ip == NULL) {
+			endpointDiscoveryServer_getIpAdress(NULL, &ip);
+		}
+	}
+
+	if (ip != NULL) {
+		fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Using %s for service annunciation", ip);
+		(*server)->ip = strdup(ip);
+	}
+	else {
+		fw_log(logger, OSGI_FRAMEWORK_LOG_WARNING, "No IP address for service annunciation set.
Using %s", DEFAULT_SERVER_IP);
+		(*server)->ip = (char*) DEFAULT_SERVER_IP;
+	}
+
 	bundleContext_getProperty(context, DISCOVERY_SERVER_PORT, &port);
 	if (port == NULL) {
 		port = DEFAULT_SERVER_PORT;
 	}
 
-	char *path = NULL;
 	bundleContext_getProperty(context, DISCOVERY_SERVER_PATH, &path);
 	if (path == NULL) {
 		path = DEFAULT_SERVER_PATH;
@@ -93,19 +125,56 @@ celix_status_t endpointDiscoveryServer_c
 
 	(*server)->path = format_path(path);
 
-	const char *options[] = {
-		"listening_ports", port,
-		"num_threads", DEFAULT_SERVER_THREADS,
-		NULL
-	};
-
 	const struct mg_callbacks callbacks = {
 		.begin_request = endpointDiscoveryServer_callback,
 	};
 
-	(*server)->ctx = mg_start(&callbacks, (*server), options);
+	unsigned int port_counter = 0;
+
+	do {
+		const char *options[] = {
+			"listening_ports", port,
+			"num_threads", DEFAULT_SERVER_THREADS,
+			NULL
+		};
+
+		(*server)->ctx = mg_start(&callbacks, (*server), options);
+
+		if ((*server)->ctx != NULL)
+		{
+			fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Starting discovery server on port %s...", port);
+			(*server)->port = port;
+		}
+		else {
+			errno = 0;
+			char* newPort = calloc(10, sizeof(*newPort));
+	        char* endptr = port;
+	        int currentPort = strtol(port, &endptr, 10);
+
+	        if (*endptr || errno != 0) {
+	            currentPort = strtol(DEFAULT_SERVER_PORT, NULL, 10);
+	        }
+
+	        port_counter++;
+			snprintf(newPort, 6,  "%d", (currentPort+1));
 
-	fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Starting discovery server on port %s...", port);
+			fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "Error while starting discovery server on port
%s - retrying on port %s...", port, newPort);
+			port = newPort;
+		}
+
+	} while(((*server)->ctx == NULL) && (port_counter < MAX_NUMBER_OF_RESTARTS));
+
+	return status;
+}
+
+celix_status_t endpointDiscoveryServer_getUrl(endpoint_discovery_server_pt server, char*
url)
+{
+	celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+
+	if (server->ip && server->port && server->path) {
+		sprintf(url, "http://%s:%s/%s", server->ip, server->port, server->path);
+		status = CELIX_SUCCESS;
+	}
 
 	return status;
 }
@@ -127,6 +196,9 @@ celix_status_t endpointDiscoveryServer_d
 	status = celixThreadMutex_destroy(&server->serverLock);
 
 	free((void*) server->path);
+	free((void*) server->port);
+	free((void*) server->ip);
+
 	free(server);
 
 	return status;
@@ -308,3 +380,34 @@ static int endpointDiscoveryServer_callb
 
 	return status;
 }
+
+static celix_status_t endpointDiscoveryServer_getIpAdress(char* interface, char** ip) {
+	celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+
+	struct ifaddrs *ifaddr, *ifa;
+    char host[NI_MAXHOST];
+
+    if (getifaddrs(&ifaddr) != -1)
+    {
+		for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = ifa->ifa_next)
+		{
+			if (ifa->ifa_addr == NULL)
+				continue;
+
+			if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL,
0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) {
+				if (interface == NULL) {
+					*ip = strdup(host);
+					status = CELIX_SUCCESS;
+				}
+				else if (strcmp(ifa->ifa_name, interface) == 0) {
+					*ip = strdup(host);
+					status = CELIX_SUCCESS;
+				}
+			}
+		}
+
+		freeifaddrs(ifaddr);
+    }
+
+    return status;
+}

Modified: celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h?rev=1632567&r1=1632566&r2=1632567&view=diff
==============================================================================
--- celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h (original)
+++ celix/trunk/remote_services/discovery_configured/private/include/discovery_impl.h Fri
Oct 17 13:05:20 2014
@@ -37,7 +37,7 @@
 #include "endpoint_discovery_server.h"
 
 
-
+#define DEFAULT_SERVER_IP 	"127.0.0.1"
 #define DEFAULT_SERVER_PORT "9999"
 #define DEFAULT_SERVER_PATH "/org.apache.celix.discovery.configured"
 #define DEFAULT_POLL_ENDPOINTS "http://localhost:9999/org.apache.celix.discovery.configured"

Modified: celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h?rev=1632567&r1=1632566&r2=1632567&view=diff
==============================================================================
--- celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h (original)
+++ celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h Fri Oct 17
13:05:20 2014
@@ -38,9 +38,11 @@
 #include "etcd_watcher.h"
 
 
+#define DEFAULT_SERVER_IP 	"127.0.0.1"
 #define DEFAULT_SERVER_PORT "9999"
 #define DEFAULT_SERVER_PATH "/org.apache.celix.discovery.etcd"
-#define DEFAULT_POLL_ENDPOINTS "http://localhost:9999/org.apache.celix.discovery.etcd"
+
+#define DEFAULT_POLL_ENDPOINTS ""
 
 #define MAX_ROOTNODE_LENGTH		 64
 #define MAX_LOCALNODE_LENGTH	256

Modified: celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h?rev=1632567&r1=1632566&r2=1632567&view=diff
==============================================================================
--- celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h (original)
+++ celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h Fri Oct 17 13:05:20
2014
@@ -33,7 +33,7 @@
 
 typedef struct etcd_watcher *etcd_watcher_pt;
 
-celix_status_t etcdWatcher_create(endpoint_discovery_poller_pt poller, bundle_context_pt
context, etcd_watcher_pt *watcher);
+celix_status_t etcdWatcher_create(discovery_pt discovery,  bundle_context_pt context, etcd_watcher_pt
*watcher);
 celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher);
 
 

Modified: celix/trunk/remote_services/discovery_etcd/private/src/etcd.c
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/src/etcd.c?rev=1632567&r1=1632566&r2=1632567&view=diff
==============================================================================
--- celix/trunk/remote_services/discovery_etcd/private/src/etcd.c (original)
+++ celix/trunk/remote_services/discovery_etcd/private/src/etcd.c Fri Oct 17 13:05:20 2014
@@ -103,7 +103,7 @@ bool etcd_get(char* key, char* value, ch
 	} else if ((js_root = json_loads(reply.memory, 0, &error)) == NULL) {
 		printf("error while parsing json data\n");
 	} else if ((js_node = json_object_get(js_root, ETCD_JSON_NODE)) == NULL) {
-		printf("error while retrieving expected node object\n");
+		printf("error while retrieving expected node object %s\n", json_dumps(js_root, 0));
 	} else if (((js_value = json_object_get(js_node, ETCD_JSON_VALUE)) == NULL) || ((js_value
= json_object_get(js_node, ETCD_JSON_VALUE)) == NULL) || ((js_modifiedIndex = json_object_get(js_node,
ETCD_JSON_MODIFIEDINDEX)) == NULL)) {
 		printf("error while retrieving expected objects\n");
 	}
@@ -208,7 +208,7 @@ bool etcd_set(char* key, char* value, in
 	} else if ((js_root = json_loads(reply.memory, 0, &error)) == NULL) {
 		printf("error while parsing json data\n");
 	} else if ((js_node = json_object_get(js_root, ETCD_JSON_NODE)) == NULL) {
-		printf("error while retrieving expected node object\n");
+		printf("error while retrieving expected node object %s\n", json_dumps(js_root, 0));
 	} else if ((js_value = json_object_get(js_node, ETCD_JSON_VALUE)) == NULL) {
 		printf("error while retrieving expected value object\n");
 	} else if (json_is_string(js_value)) {
@@ -247,7 +247,7 @@ bool etcd_del(char* key) {
 	} else if ((js_root = json_loads(reply.memory, 0, &error)) == NULL) {
 		printf("error while parsing json data\n");
 	} else if ((js_node = json_object_get(js_root, ETCD_JSON_NODE)) == NULL) {
-		printf("error while retrieving expected node object\n");
+		printf("error while retrieving expected node object %s\n", json_dumps(js_root, 0));
 	} else {
 		retVal = true;
 	}
@@ -277,10 +277,10 @@ bool etcd_watch(char* key, int index, ch
 	reply.size = 0; /* no data at this point */
 
 	if (index != 0)
-		snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&waitIndex=%d", etcd_server,
etcd_port, key,
+		snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&recursive=true&waitIndex=%d",
etcd_server, etcd_port, key,
 				index);
 	else
-		snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true", etcd_server, etcd_port,
key);
+		snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&recursive=true", etcd_server,
etcd_port, key);
 
 	res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
 
@@ -290,18 +290,24 @@ bool etcd_watch(char* key, int index, ch
 		printf("error while performing curl w/ %s\n", url);
 	} else if ((js_root = json_loads(reply.memory, 0, &error)) == NULL) {
 		printf("error while parsing json data\n");
-	} else if (((js_action = json_object_get(js_root, ETCD_JSON_ACTION)) == NULL) ||
-			((js_node = json_object_get(js_root, ETCD_JSON_NODE)) == NULL) ||
-			((js_prevNode = json_object_get(js_root, ETCD_JSON_PREVNODE)) == NULL)) {
-		printf("error while retrieving expected node object\n");
-	} else if (((js_value = json_object_get(js_node, ETCD_JSON_VALUE)) == NULL) ||
-			((js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE)) == NULL)) {
-		printf("error while retrieving expected value objects\n");
-	} else if (json_is_string(js_value) && json_is_string(js_prevValue) && json_is_string(js_action))
{
-		strncpy(value, json_string_value(js_value), MAX_VALUE_LENGTH);
-		strncpy(prevValue, json_string_value(js_prevValue), MAX_VALUE_LENGTH);
-		strncpy(action, json_string_value(js_action), MAX_ACTION_LENGTH);
-		retVal = true;
+	} else {
+		js_action = json_object_get(js_root, ETCD_JSON_ACTION);
+		js_node = json_object_get(js_root, ETCD_JSON_NODE);
+		js_prevNode = json_object_get(js_root, ETCD_JSON_PREVNODE);
+	
+		if (js_action == NULL || js_node == NULL) {
+			printf("error while retrieving expected node object %s\n", json_dumps(js_root, 0));
+		} else if ((js_value = json_object_get(js_node, ETCD_JSON_VALUE)) == NULL) {
+			printf("error while retrieving expected value objects\n");
+		}
+		else if (json_is_string(js_value) && json_is_string(js_action)) {
+			if ((js_prevNode != NULL) && ((js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE))
!= NULL) && (json_is_string(js_prevValue))) {
+				strncpy(prevValue, json_string_value(js_prevValue), MAX_VALUE_LENGTH);
+			}
+			strncpy(value, json_string_value(js_value), MAX_VALUE_LENGTH);
+			strncpy(action, json_string_value(js_action), MAX_ACTION_LENGTH);
+			retVal = true;
+		}
 	}
 
 	if (reply.memory) {

Modified: celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c?rev=1632567&r1=1632566&r2=1632567&view=diff
==============================================================================
--- celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c (original)
+++ celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c Fri Oct 17 13:05:20
2014
@@ -145,25 +145,31 @@ static celix_status_t etcdWatcher_addAlr
 }
 
 
-static celix_status_t etcdWatcher_addOwnFramework(bundle_context_pt context)
+static celix_status_t etcdWatcher_addOwnFramework(etcd_watcher_pt watcher)
 {
     celix_status_t status = CELIX_BUNDLE_EXCEPTION;
     char localNodePath[MAX_LOCALNODE_LENGTH];
     char value[MAX_VALUE_LENGTH];
     char action[MAX_VALUE_LENGTH];
+ 	char url[MAX_VALUE_LENGTH];
     int modIndex;
     char* endpoints = NULL;
     char* ttlStr = NULL;
     int ttl;
 
+	bundle_context_pt context = watcher->discovery->context;
+	endpoint_discovery_server_pt server = watcher->discovery->server;
+
     // register own framework
     if ((status = etcdWatcher_getLocalNodePath(context, &localNodePath[0])) != CELIX_SUCCESS)
{
         return status;
     }
 
-    if ((bundleContext_getProperty(context, DISCOVERY_POLL_ENDPOINTS, &endpoints) !=
CELIX_SUCCESS) || !endpoints) {
-        endpoints = DEFAULT_POLL_ENDPOINTS;
-    }
+	if (endpointDiscoveryServer_getUrl(server, &url[0]) != CELIX_SUCCESS) {
+		snprintf(url, MAX_VALUE_LENGTH, "http://%s:%s/%s", DEFAULT_SERVER_IP, DEFAULT_SERVER_PORT,
DEFAULT_SERVER_PATH);
+	}
+
+	endpoints = &url[0];
 
     if ((bundleContext_getProperty(context, CFG_ETCD_TTL, &ttlStr) != CELIX_SUCCESS)
|| !ttlStr) {
         ttl = DEFAULT_ETCD_TTL;
@@ -212,19 +218,21 @@ static void* etcdWatcher_run(void* data)
 		char preValue[MAX_VALUE_LENGTH];
 		char action[MAX_ACTION_LENGTH];
 
-		if (etcd_watch(rootPath, highestModified + 1, &action[0], &preValue[0], &value[0])
== true) {
+		if (etcd_watch(rootPath, 0, &action[0], &preValue[0], &value[0]) == true) {
 
 			if (strcmp(action, "set") == 0) {
-				endpointDiscoveryPoller_removeDiscoveryEndpoint(poller, &preValue[0]);
 				endpointDiscoveryPoller_addDiscoveryEndpoint(poller, &value[0]);
 			} else if (strcmp(action, "delete") == 0) {
 				endpointDiscoveryPoller_removeDiscoveryEndpoint(poller, &preValue[0]);
+			} else if (strcmp(action, "update") == 0) {
+				// TODO
 			} else {
 				fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Unexpected action: %s", action);
 			}
 		}
+
 		// update own framework uuid in any case;
-	    etcdWatcher_addOwnFramework(context);
+	    etcdWatcher_addOwnFramework(watcher);
 	}
 
 	return NULL;
@@ -278,7 +286,7 @@ celix_status_t etcdWatcher_create(discov
 		return CELIX_BUNDLE_EXCEPTION;
 	}
 
-	etcdWatcher_addOwnFramework(context);
+	etcdWatcher_addOwnFramework((*watcher));
 
 	if ((status = celixThreadMutex_create(&(*watcher)->watcherLock, NULL)) != CELIX_SUCCESS)
{
 		return status;



Mime
View raw message