celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bpe...@apache.org
Subject svn commit: r1626250 [2/2] - in /celix/trunk/remote_services: ./ discovery/ discovery/private/ discovery/private/include/ discovery/private/src/ discovery_configured/ discovery_configured/private/include/ discovery_configured/private/src/ discovery_etc...
Date Fri, 19 Sep 2014 15:36:25 GMT
Added: celix/trunk/remote_services/discovery_configured/private/src/discovery_impl.c
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_configured/private/src/discovery_impl.c?rev=1626250&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery_configured/private/src/discovery_impl.c (added)
+++ celix/trunk/remote_services/discovery_configured/private/src/discovery_impl.c Fri Sep
19 15:36:24 2014
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * discovery.c
+ *
+ * \date        Aug 8, 2014
+ * \author    	<a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project
Team</a>
+ * \copyright	Apache License, Version 2.0
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <stdbool.h>
+#include <netdb.h>
+#include <netinet/in.h>
+
+#include "constants.h"
+#include "celix_threads.h"
+#include "bundle_context.h"
+#include "array_list.h"
+#include "utils.h"
+#include "celix_errno.h"
+#include "filter.h"
+#include "service_reference.h"
+#include "service_registration.h"
+#include "remote_constants.h"
+#include "celix_log.h"
+
+#include "discovery.h"
+#include "discovery_impl.h"
+#include "endpoint_discovery_poller.h"
+#include "endpoint_discovery_server.h"
+
+
+
+celix_status_t discovery_create(bundle_context_pt context, discovery_pt *discovery) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	*discovery = malloc(sizeof(struct discovery));
+	if (!*discovery) {
+		return CELIX_ENOMEM;
+	}
+
+	(*discovery)->context = context;
+	(*discovery)->poller = NULL;
+	(*discovery)->server = NULL;
+
+	(*discovery)->listenerReferences = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2,
NULL);
+	(*discovery)->discoveredServices = hashMap_create(utils_stringHash, NULL, utils_stringEquals,
NULL);
+
+	status = celixThreadMutex_create(&(*discovery)->listenerReferencesMutex, NULL);
+	status = celixThreadMutex_create(&(*discovery)->discoveredServicesMutex, NULL);
+
+	return status;
+}
+
+celix_status_t discovery_start(discovery_pt discovery) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    status = endpointDiscoveryPoller_create(discovery, discovery->context, &discovery->poller);
+    if (status != CELIX_SUCCESS) {
+    	return CELIX_BUNDLE_EXCEPTION;
+    }
+
+    status = endpointDiscoveryServer_create(discovery, discovery->context, &discovery->server);
+    if (status != CELIX_SUCCESS) {
+    	return CELIX_BUNDLE_EXCEPTION;
+    }
+
+    return status;
+}
+
+celix_status_t discovery_stop(discovery_pt discovery) {
+	celix_status_t status;
+
+	status = endpointDiscoveryServer_destroy(discovery->server);
+	if (status != CELIX_SUCCESS) {
+		return CELIX_BUNDLE_EXCEPTION;
+	}
+
+	status = endpointDiscoveryPoller_destroy(discovery->poller);
+	if (status != CELIX_SUCCESS) {
+		return CELIX_BUNDLE_EXCEPTION;
+	}
+
+	hash_map_iterator_pt iter;
+
+	celixThreadMutex_lock(&discovery->discoveredServicesMutex);
+
+	iter = hashMapIterator_create(discovery->discoveredServices);
+	while (hashMapIterator_hasNext(iter)) {
+		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+		endpoint_description_pt endpoint = hashMapEntry_getValue(entry);
+
+		discovery_informEndpointListeners(discovery, endpoint, false);
+	}
+	hashMapIterator_destroy(iter);
+
+	celixThreadMutex_unlock(&discovery->discoveredServicesMutex);
+
+	return status;
+}
+
+celix_status_t discovery_destroy(discovery_pt discovery) {
+	celix_status_t status = CELIX_SUCCESS;
+	hash_map_iterator_pt iter;
+
+	discovery->context = NULL;
+	discovery->poller = NULL;
+	discovery->server = NULL;
+
+	celixThreadMutex_lock(&discovery->discoveredServicesMutex);
+
+	hashMap_destroy(discovery->discoveredServices, false, false);
+	discovery->discoveredServices = NULL;
+
+	celixThreadMutex_unlock(&discovery->discoveredServicesMutex);
+
+	celixThreadMutex_destroy(&discovery->discoveredServicesMutex);
+
+	celixThreadMutex_lock(&discovery->listenerReferencesMutex);
+
+	hashMap_destroy(discovery->listenerReferences, false, false);
+	discovery->listenerReferences = NULL;
+
+	celixThreadMutex_unlock(&discovery->listenerReferencesMutex);
+
+	celixThreadMutex_destroy(&discovery->listenerReferencesMutex);
+
+	free(discovery);
+
+	return status;
+}

Added: celix/trunk/remote_services/discovery_etcd/CMakeLists.txt
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/CMakeLists.txt?rev=1626250&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery_etcd/CMakeLists.txt (added)
+++ celix/trunk/remote_services/discovery_etcd/CMakeLists.txt Fri Sep 19 15:36:24 2014
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+celix_subproject(RSA_BUNDLES_DISCOVERY_ETCD "Option to enable building the Discovery (ETCD)
bundle" ON DEPS LAUNCHER topology_manager remote_service_admin)
+if (RSA_BUNDLES_DISCOVERY_ETCD)
+	find_package(CURL REQUIRED)
+	find_package(LibXml2 REQUIRED)
+	find_package(Jansson REQUIRED)
+	
+	include_directories("${CURL_INCLUDE_DIR}")
+	include_directories("${JANSSON_INCLUDE_DIR}")
+	include_directories("${LIBXML2_INCLUDE_DIR}")
+	include_directories("${PROJECT_SOURCE_DIR}/utils/public/include")
+	include_directories("${PROJECT_SOURCE_DIR}/remote_services/utils/private/include")
+	include_directories("${PROJECT_SOURCE_DIR}/remote_services/utils/public/include")
+	include_directories("${PROJECT_SOURCE_DIR}/remote_services/discovery/private/include")
+	include_directories("${PROJECT_SOURCE_DIR}/remote_services/discovery_etcd/private/include")
+	include_directories("${PROJECT_SOURCE_DIR}/remote_services/endpoint_listener/public/include")
+	include_directories("${PROJECT_SOURCE_DIR}/remote_services/remote_service_admin/public/include")
+	include_directories("private/include")
+
+	SET_HEADER(BUNDLE_SYMBOLICNAME "apache_celix_rsa_discovery_etcd")
+	SET_HEADERS("Bundle-Name: Apache Celix RSA Discovery ETCD")
+
+    
+	bundle(discovery_etcd SOURCES
+		private/src/discovery_impl.c
+		private/src/etcd.c
+	    private/src/etcd_watcher.c
+		${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/discovery_activator.c
+		${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/discovery.c
+		${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/endpoint_descriptor_reader.c
+		${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/endpoint_descriptor_writer.c
+		${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/endpoint_discovery_poller.c
+		${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/endpoint_discovery_server.c
+	    ${PROJECT_SOURCE_DIR}/remote_services/remote_service_admin/private/src/endpoint_description.c
+	    ${PROJECT_SOURCE_DIR}/remote_services/utils/private/src/civetweb.c
+	)
+
+	install_bundle(discovery_etcd)
+		
+	target_link_libraries(discovery_etcd celix_framework ${CURL_LIBRARIES} ${LIBXML2_LIBRARIES}
${JANSSON_LIBRARIES})
+
+endif (RSA_BUNDLES_DISCOVERY_ETCD)

Added: celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h?rev=1626250&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h (added)
+++ celix/trunk/remote_services/discovery_etcd/private/include/discovery_impl.h Fri Sep 19
15:36:24 2014
@@ -0,0 +1,63 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * topology_manager.h
+ *
+ *  \date       Sep 29, 2011
+ *  \author    	<a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project
Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef DISCOVERY_IMPL_H_
+#define DISCOVERY_IMPL_H_
+
+#include "bundle_context.h"
+#include "service_reference.h"
+
+#include "endpoint_description.h"
+#include "endpoint_listener.h"
+
+#include "endpoint_discovery_poller.h"
+#include "endpoint_discovery_server.h"
+#include "etcd_watcher.h"
+
+
+#define DEFAULT_SERVER_PORT "9999"
+#define DEFAULT_SERVER_PATH "/org.apache.celix.discovery.etcd"
+#define DEFAULT_POLL_ENDPOINTS "http://localhost:9999/org.apache.celix.discovery.etcd"
+
+#define MAX_ROOTNODE_LENGTH		 64
+#define MAX_LOCALNODE_LENGTH	256
+
+
+struct discovery {
+	bundle_context_pt context;
+
+	celix_thread_mutex_t listenerReferencesMutex;
+	celix_thread_mutex_t discoveredServicesMutex;
+
+	hash_map_pt listenerReferences; //key=serviceReference, value=nop
+	hash_map_pt discoveredServices; //key=endpointId (string), value=endpoint_description_pt
+
+	etcd_watcher_pt watcher;
+	endpoint_discovery_poller_pt poller;
+	endpoint_discovery_server_pt server;
+};
+
+#endif /* DISCOVERY_H_ */

Added: celix/trunk/remote_services/discovery_etcd/private/include/etcd.h
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/include/etcd.h?rev=1626250&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery_etcd/private/include/etcd.h (added)
+++ celix/trunk/remote_services/discovery_etcd/private/include/etcd.h Fri Sep 19 15:36:24
2014
@@ -0,0 +1,28 @@
+#ifndef ETCD_H_
+#define ETCD_H_
+
+#define MAX_NODES			256
+
+#define MAX_KEY_LENGTH		256
+#define MAX_VALUE_LENGTH	256
+#define MAX_ACTION_LENGTH	64
+
+#define MAX_URL_LENGTH		256
+#define MAX_CONTENT_LENGTH	1024
+
+#define ETCD_JSON_NODE			"node"
+#define ETCD_JSON_PREVNODE		"prevNode"
+#define ETCD_JSON_NODES			"nodes"
+#define ETCD_JSON_ACTION		"action"
+#define ETCD_JSON_KEY			"key"
+#define ETCD_JSON_VALUE			"value"
+#define ETCD_JSON_MODIFIEDINDEX	"modifiedIndex"
+
+bool etcd_init(char* server, int port);
+bool etcd_get(char* key, char* value, char*action, int* modifiedIndex);
+bool etcd_getNodes(char* directory, char** nodeNames, int* size);
+bool etcd_set(char* key, char* value);
+bool etcd_del(char* key);
+bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value);
+
+#endif /* ETCD_H_ */

Added: celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h?rev=1626250&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h (added)
+++ celix/trunk/remote_services/discovery_etcd/private/include/etcd_watcher.h Fri Sep 19 15:36:24
2014
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * etcd_watcher.h
+ *
+ * \date       17 Sep 2014
+ * \author     <a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project
Team</a>
+ * \copyright  Apache License, Version 2.0
+ */
+
+#ifndef ETCD_WATCHER_H_
+#define ETCD_WATCHER_H_
+
+#include "celix_errno.h"
+#include "discovery.h"
+#include "endpoint_discovery_poller.h"
+
+typedef struct etcd_watcher *etcd_watcher_pt;
+
+celix_status_t etcdWatcher_create(endpoint_discovery_poller_pt poller, bundle_context_pt
context, etcd_watcher_pt *watcher);
+celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher);
+
+
+#endif /* ETCD_WATCHER_H_ */

Added: celix/trunk/remote_services/discovery_etcd/private/src/discovery_impl.c
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/src/discovery_impl.c?rev=1626250&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery_etcd/private/src/discovery_impl.c (added)
+++ celix/trunk/remote_services/discovery_etcd/private/src/discovery_impl.c Fri Sep 19 15:36:24
2014
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * discovery.c
+ *
+ * \date        Aug 8, 2014
+ * \author    	<a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project
Team</a>
+ * \copyright	Apache License, Version 2.0
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <stdbool.h>
+#include <netdb.h>
+#include <netinet/in.h>
+
+#include "constants.h"
+#include "celix_threads.h"
+#include "bundle_context.h"
+#include "array_list.h"
+#include "utils.h"
+#include "celix_errno.h"
+#include "filter.h"
+#include "service_reference.h"
+#include "service_registration.h"
+#include "remote_constants.h"
+
+
+#include "discovery.h"
+#include "discovery_impl.h"
+#include "etcd_watcher.h"
+#include "endpoint_discovery_poller.h"
+#include "endpoint_discovery_server.h"
+
+
+
+celix_status_t discovery_create(bundle_context_pt context, discovery_pt *discovery) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	*discovery = malloc(sizeof(struct discovery));
+	if (!*discovery) {
+		return CELIX_ENOMEM;
+	}
+
+	(*discovery)->context = context;
+	(*discovery)->poller = NULL;
+	(*discovery)->server = NULL;
+
+	(*discovery)->listenerReferences = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2,
NULL);
+	(*discovery)->discoveredServices = hashMap_create(utils_stringHash, NULL, utils_stringEquals,
NULL);
+
+	status = celixThreadMutex_create(&(*discovery)->listenerReferencesMutex, NULL);
+	status = celixThreadMutex_create(&(*discovery)->discoveredServicesMutex, NULL);
+
+	return status;
+}
+
+
+
+celix_status_t discovery_destroy(discovery_pt discovery) {
+	celix_status_t status = CELIX_SUCCESS;
+	hash_map_iterator_pt iter;
+
+	discovery->context = NULL;
+	discovery->poller = NULL;
+	discovery->server = NULL;
+
+	celixThreadMutex_lock(&discovery->discoveredServicesMutex);
+
+	hashMap_destroy(discovery->discoveredServices, false, false);
+	discovery->discoveredServices = NULL;
+
+	celixThreadMutex_unlock(&discovery->discoveredServicesMutex);
+
+	celixThreadMutex_destroy(&discovery->discoveredServicesMutex);
+
+	celixThreadMutex_lock(&discovery->listenerReferencesMutex);
+
+	hashMap_destroy(discovery->listenerReferences, false, false);
+	discovery->listenerReferences = NULL;
+
+	celixThreadMutex_unlock(&discovery->listenerReferencesMutex);
+
+	celixThreadMutex_destroy(&discovery->listenerReferencesMutex);
+
+	free(discovery);
+
+	return status;
+}
+
+celix_status_t discovery_start(discovery_pt discovery) {
+    celix_status_t status = CELIX_SUCCESS;
+	char *port = NULL;
+	char *path = NULL;
+
+
+	bundleContext_getProperty(discovery->context, DISCOVERY_SERVER_PORT, &port);
+	if (port == NULL) {
+		port = DEFAULT_SERVER_PORT;
+	}
+
+	bundleContext_getProperty(discovery->context, DISCOVERY_SERVER_PATH, &path);
+	if (path == NULL) {
+		path = DEFAULT_SERVER_PATH;
+	}
+
+    status = endpointDiscoveryPoller_create(discovery, discovery->context, &discovery->poller);
+    if (status != CELIX_SUCCESS) {
+    	return CELIX_BUNDLE_EXCEPTION;
+    }
+
+    status = etcdWatcher_create(discovery->poller, discovery->context, &discovery->watcher);
+    if (status != CELIX_SUCCESS) {
+    	return CELIX_BUNDLE_EXCEPTION;
+    }
+
+    status = endpointDiscoveryServer_create(discovery, discovery->context, &discovery->server);
+    if (status != CELIX_SUCCESS) {
+    	return CELIX_BUNDLE_EXCEPTION;
+    }
+
+    return status;
+}
+
+celix_status_t discovery_stop(discovery_pt discovery) {
+	celix_status_t status;
+
+	status = endpointDiscoveryServer_destroy(discovery->server);
+	if (status != CELIX_SUCCESS) {
+		return CELIX_BUNDLE_EXCEPTION;
+	}
+
+	status = etcdWatcher_destroy(discovery->watcher);
+	if (status != CELIX_SUCCESS) {
+		return CELIX_BUNDLE_EXCEPTION;
+	}
+
+	status = endpointDiscoveryPoller_destroy(discovery->poller);
+	if (status != CELIX_SUCCESS) {
+		return CELIX_BUNDLE_EXCEPTION;
+	}
+
+	hash_map_iterator_pt iter;
+
+	celixThreadMutex_lock(&discovery->discoveredServicesMutex);
+
+	iter = hashMapIterator_create(discovery->discoveredServices);
+	while (hashMapIterator_hasNext(iter)) {
+		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+		endpoint_description_pt endpoint = hashMapEntry_getValue(entry);
+
+		discovery_informEndpointListeners(discovery, endpoint, false);
+	}
+	hashMapIterator_destroy(iter);
+
+	celixThreadMutex_unlock(&discovery->discoveredServicesMutex);
+
+	return status;
+}
+
+
+

Added: celix/trunk/remote_services/discovery_etcd/private/src/etcd.c
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/src/etcd.c?rev=1626250&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery_etcd/private/src/etcd.c (added)
+++ celix/trunk/remote_services/discovery_etcd/private/src/etcd.c Fri Sep 19 15:36:24 2014
@@ -0,0 +1,304 @@
+#include <stdio.h>
+#include <stdbool.h>
+#include <string.h>
+
+#include <curl/curl.h>
+#include <jansson.h>
+
+
+
+#include "etcd.h"
+
+typedef enum {
+	GET, PUT, DELETE
+} request_t;
+
+static char* etcd_server = NULL;
+static int etcd_port = NULL;
+
+struct MemoryStruct {
+	char *memory;
+	size_t size;
+};
+
+static size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp)
{
+	size_t realsize = size * nmemb;
+	struct MemoryStruct *mem = (struct MemoryStruct *) userp;
+
+	mem->memory = realloc(mem->memory, mem->size + realsize + 1);
+	if (mem->memory == NULL) {
+		/* out of memory! */
+		printf("not enough memory (realloc returned NULL)\n");
+		return 0;
+	}
+
+	memcpy(&(mem->memory[mem->size]), contents, realsize);
+	mem->size += realsize;
+	mem->memory[mem->size] = 0;
+
+	return realsize;
+}
+
+static int performRequest(char* url, request_t request, void* callback, void* reqData, void*
repData) {
+	CURL *curl = NULL;
+	CURLcode res = 0;
+
+	curl = curl_easy_init();
+	curl_easy_setopt(curl, CURLOPT_TIMEOUT, 10);
+	curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 10);
+	curl_easy_setopt(curl, CURLOPT_URL, url);
+	curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
+	curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, callback);
+	curl_easy_setopt(curl, CURLOPT_WRITEDATA, repData);
+
+	if (request == PUT) {
+		curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
+		curl_easy_setopt(curl, CURLOPT_POST, 1L);
+//		curl_easy_setopt(curl, CURLOPT_HTTPHEADER, "Content-type: application/json");
+		curl_easy_setopt(curl, CURLOPT_POSTFIELDS, reqData);
+	} else if (request == DELETE) {
+		curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
+	} else if (request == GET) {
+		curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "GET");
+	}
+
+	res = curl_easy_perform(curl);
+	curl_easy_cleanup(curl);
+
+	return res;
+}
+
+// open
+bool etcd_init(char* server, int port) {
+	etcd_server = server;
+	etcd_port = port;
+
+	return curl_global_init(CURL_GLOBAL_ALL) == 0;
+}
+
+// get
+bool etcd_get(char* key, char* value, char* action, int* modifiedIndex) {
+	json_t* js_root;
+	json_t* js_node;
+	json_t* js_value;
+	json_t* js_modifiedIndex;
+	json_error_t error;
+	int res;
+	struct MemoryStruct reply;
+
+	reply.memory = malloc(1); /* will be grown as needed by the realloc above */
+	reply.size = 0; /* no data at this point */
+
+	bool retVal = false;
+	char url[MAX_URL_LENGTH];
+	snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key);
+
+	res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
+
+	if (res == CURLE_OPERATION_TIMEDOUT) {
+		//printf("error while performing curl w/ %s\n", url);
+	} else if (res != CURLE_OK) {
+		printf("error while performing curl\n");
+	} else if ((js_root = json_loads(reply.memory, 0, &error)) == NULL) {
+		printf("error while parsing json data\n");
+	} else if ((js_node = json_object_get(js_root, ETCD_JSON_NODE)) == NULL) {
+		printf("error while retrieving expected node object\n");
+	} else if (((js_value = json_object_get(js_node, ETCD_JSON_VALUE)) == NULL) || ((js_value
= json_object_get(js_node, ETCD_JSON_VALUE)) == NULL) || ((js_modifiedIndex = json_object_get(js_node,
ETCD_JSON_MODIFIEDINDEX)) == NULL)) {
+		printf("error while retrieving expected objects\n");
+	}
+	else {
+		*modifiedIndex = json_integer_value(js_modifiedIndex);
+		strncpy(value, json_string_value(js_value), MAX_VALUE_LENGTH);
+		retVal = true;
+	}
+
+	if (reply.memory) {
+		free(reply.memory);
+	}
+
+	return retVal;
+}
+
+// getNodes
+bool etcd_getNodes(char* directory, char** nodeNames, int* size) {
+	json_t* js_root;
+	json_t* js_node;
+	json_t* js_nodes;
+	json_t* js_value;
+	json_error_t error;
+	int res;
+	struct MemoryStruct reply;
+
+	reply.memory = malloc(1); /* will be grown as needed by the realloc above */
+	reply.size = 0; /* no data at this point */
+
+	bool retVal = false;
+	char url[MAX_URL_LENGTH];
+	snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, directory);
+
+	res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
+	if (res == CURLE_OPERATION_TIMEDOUT) {
+		//printf("error while performing curl w/ %s\n", url);
+	} else if (res != CURLE_OK) {
+		printf("error while performing curl\n");
+	} else if ((js_root = json_loads(reply.memory, 0, &error)) == NULL) {
+		printf("error while parsing json data\n");
+	} else if ((js_node = json_object_get(js_root, ETCD_JSON_NODE)) == NULL) {
+		printf("error while retrieving expected nodes object\n");
+	} else if ((js_nodes = json_object_get(js_node, ETCD_JSON_NODES)) == NULL) {
+		printf("error while retrieving expected nodes object\n");
+	} else if (json_is_array(js_nodes)) {
+
+		int i = 0;
+
+		retVal = true;
+
+		for (i = 0; i < json_array_size(js_nodes) && i < MAX_NODES; i++) {
+			json_t* js_node = json_array_get(js_nodes, i);
+
+			if (!json_is_object(js_node)) {
+				retVal = false;
+			} else {
+				json_t* js_key = json_object_get(js_node, ETCD_JSON_KEY);
+				strncpy(nodeNames[i], json_string_value(js_key), MAX_KEY_LENGTH);
+			}
+		}
+		*size = i;
+
+	}
+
+	if (reply.memory) {
+		free(reply.memory);
+	}
+
+	return retVal;
+}
+
+//set
+bool etcd_set(char* key, char* value) {
+	json_error_t error;
+	json_t* js_root;
+	json_t* js_node;
+	json_t* js_value;
+	bool retVal = false;
+	char url[MAX_URL_LENGTH];
+	char request[MAX_CONTENT_LENGTH];
+	int res;
+	struct MemoryStruct reply;
+
+	reply.memory = malloc(1); /* will be grown as needed by the realloc above */
+	reply.size = 0; /* no data at this point */
+
+	snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key);
+	snprintf(request, MAX_CONTENT_LENGTH, "value=%s", value);
+
+	res = performRequest(url, PUT, WriteMemoryCallback, request, (void*) &reply);
+	if (res == CURLE_OPERATION_TIMEDOUT) {
+		//printf("error while performing curl w/ %s\n", url);
+	} else if (res != CURLE_OK) {
+		printf("error while performing curl\n");
+	} else if ((js_root = json_loads(reply.memory, 0, &error)) == NULL) {
+		printf("error while parsing json data\n");
+	} else if ((js_node = json_object_get(js_root, ETCD_JSON_NODE)) == NULL) {
+		printf("error while retrieving expected node object\n");
+	} else if ((js_value = json_object_get(js_node, ETCD_JSON_VALUE)) == NULL) {
+		printf("error while retrieving expected value object\n");
+	} else if (json_is_string(js_value)) {
+		retVal = (strcmp(json_string_value(js_value), value) == 0);
+	}
+
+	if (reply.memory) {
+		free(reply.memory);
+	}
+
+	return retVal;
+}
+
+//delete
+bool etcd_del(char* key) {
+	json_error_t error;
+	json_t* js_root;
+	json_t* js_node;
+	json_t* js_value;
+	bool retVal = false;
+	char url[MAX_URL_LENGTH];
+	char request[MAX_CONTENT_LENGTH];
+	int res;
+	struct MemoryStruct reply;
+
+	reply.memory = malloc(1); /* will be grown as needed by the realloc above */
+	reply.size = 0; /* no data at this point */
+
+	snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key);
+	res = performRequest(url, DELETE, WriteMemoryCallback, request, (void*) &reply);
+	if (res == CURLE_OPERATION_TIMEDOUT) {
+		//printf("error while performing curl w/ %s\n", url);
+	} else if (res != CURLE_OK) {
+		printf("error while performing curl\n");
+	} else if ((js_root = json_loads(reply.memory, 0, &error)) == NULL) {
+		printf("error while parsing json data\n");
+	} else if ((js_node = json_object_get(js_root, ETCD_JSON_NODE)) == NULL) {
+		printf("error while retrieving expected node object\n");
+	} else {
+		retVal = true;
+	}
+
+	if (reply.memory) {
+		free(reply.memory);
+	}
+
+	return retVal;
+}
+
+///watch
+bool etcd_watch(char* key, int index, char* action, char* prevValue, char* value) {
+	json_error_t error;
+	json_t* js_root;
+	json_t* js_node;
+	json_t* js_prevNode;
+	json_t* js_action;
+	json_t* js_value;
+	json_t* js_prevValue;
+	bool retVal = false;
+	char url[MAX_URL_LENGTH];
+	char request[MAX_CONTENT_LENGTH];
+	int res;
+	struct MemoryStruct reply;
+
+	reply.memory = malloc(1); /* will be grown as needed by the realloc above */
+	reply.size = 0; /* no data at this point */
+
+	if (index != 0)
+		snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true&waitIndex=%d", etcd_server,
etcd_port, key,
+				index);
+	else
+		snprintf(url, MAX_URL_LENGTH, "http://%s:%d/v2/keys/%s?wait=true", etcd_server, etcd_port,
key);
+
+	res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
+
+	if (res == CURLE_OPERATION_TIMEDOUT) {
+		//printf("error while performing curl w/ %s\n", url);
+	} else if (res != CURLE_OK) {
+		printf("error while performing curl w/ %s\n", url);
+	} else if ((js_root = json_loads(reply.memory, 0, &error)) == NULL) {
+		printf("error while parsing json data\n");
+	} else if (((js_action = json_object_get(js_root, ETCD_JSON_ACTION)) == NULL) ||
+			((js_node = json_object_get(js_root, ETCD_JSON_NODE)) == NULL) ||
+			((js_prevNode = json_object_get(js_root, ETCD_JSON_PREVNODE)) == NULL)) {
+		printf("error while retrieving expected node object\n");
+	} else if (((js_value = json_object_get(js_node, ETCD_JSON_VALUE)) == NULL) ||
+			((js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE)) == NULL)) {
+		printf("error while retrieving expected value objects\n");
+	} else if (json_is_string(js_value) && json_is_string(js_prevValue) && json_is_string(js_action))
{
+		strncpy(value, json_string_value(js_value), MAX_VALUE_LENGTH);
+		strncpy(prevValue, json_string_value(js_prevValue), MAX_VALUE_LENGTH);
+		strncpy(action, json_string_value(js_action), MAX_ACTION_LENGTH);
+		retVal = true;
+	}
+
+	if (reply.memory) {
+		free(reply.memory);
+	}
+
+	return retVal;
+}

Added: celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c?rev=1626250&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c (added)
+++ celix/trunk/remote_services/discovery_etcd/private/src/etcd_watcher.c Fri Sep 19 15:36:24
2014
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * endpoint_discovery_watcher.c
+ *
+ * \date       16 Sep 2014
+ * \author     <a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project
Team</a>
+ * \copyright  Apache License, Version 2.0
+ */
+
+#include <stdbool.h>
+#include <stdlib.h>
+
+#include "celix_log.h"
+#include "constants.h"
+#include "discovery.h"
+#include "discovery_impl.h"
+
+#include "etcd.h"
+#include "etcd_watcher.h"
+
+#include "endpoint_discovery_poller.h"
+
+struct etcd_watcher {
+	endpoint_discovery_poller_pt poller;
+	bundle_context_pt context;
+
+	celix_thread_mutex_t watcherLock;
+	celix_thread_t watcherThread;
+
+	volatile bool running;
+};
+
+#define CFG_ETCD_SERVER_IP		"DISCOVERY_ETCD_SERVER_IP"
+#define DEFAULT_ETCD_SERVER_IP	"172.17.8.201"
+
+#define CFG_ETCD_SERVER_PORT	"DISCOVERY_ETCD_SERVER_PORT"
+#define DEFAULT_ETCD_SERVER_PORT 4001
+
+
+// note that the rootNode shouldn't have a leading slash
+static celix_status_t etcdWatcher_getRootPath(char* rootNode) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	strcpy(rootNode, "discovery");
+
+	return status;
+}
+
+
+static celix_status_t etcdWatcher_getLocalNodePath(bundle_context_pt context, char* localNodePath)
{
+	celix_status_t status = CELIX_SUCCESS;
+	char rootPath[MAX_ROOTNODE_LENGTH];
+    char* uuid = NULL;
+
+    if (((etcdWatcher_getRootPath(&rootPath[0]) != CELIX_SUCCESS)) || (!rootPath)) {
+		status = CELIX_ILLEGAL_STATE;
+    }
+	else if (((bundleContext_getProperty(context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &uuid))
!= CELIX_SUCCESS) || (!uuid)) {
+		status = CELIX_ILLEGAL_STATE;
+	}
+	else if (rootPath[strlen(&rootPath[0]) - 1] == '/') {
+    	snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s%s", &rootPath[0], uuid);
+    }
+    else {
+    	snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s/%s", &rootPath[0], uuid);
+    }
+
+    return status;
+}
+/*
+ * retrieves all already existing discovery endpoints
+ * from etcd and adds them to the poller.
+ *
+ * returns the modifiedIndex of the last modified
+ * discovery endpoint (see etcd documentation).
+ */
+static celix_status_t etcdWatcher_addAlreadyExistingWatchpoints(endpoint_discovery_poller_pt
poller, int* highestModified) {
+	celix_status_t status = CELIX_SUCCESS;
+	char** nodeArr = calloc(MAX_NODES, sizeof(*nodeArr));
+	char rootPath[MAX_ROOTNODE_LENGTH];
+	int i, size;
+
+	*highestModified = -1;
+
+	for (i = 0; i < MAX_NODES; i++) {
+		nodeArr[i] = calloc(MAX_KEY_LENGTH, sizeof(*nodeArr[i]));
+	}
+
+	// we need to go though all nodes and get the highest modifiedIndex
+	if (((status = etcdWatcher_getRootPath(&rootPath[0])) == CELIX_SUCCESS) &&
+		 (etcd_getNodes(rootPath, nodeArr, &size) == true)) {
+		for (i = 0; i < size; i++) {
+			char* key = nodeArr[i];
+			char value[MAX_VALUE_LENGTH];
+			char action[MAX_VALUE_LENGTH];
+			int modIndex;
+
+			if (etcd_get(key, &value[0], &action[0], &modIndex) == true) {
+				// check that this is not equals to the local endpoint
+				endpointDiscoveryPoller_addDiscoveryEndpoint(poller, strdup(&value[0]));
+
+				if (modIndex > *highestModified) {
+					*highestModified = modIndex;
+				}
+			}
+		}
+	}
+
+	for (i = 0; i < MAX_NODES; i++) {
+		free(nodeArr[i]);
+	}
+
+	free(nodeArr);
+
+	return status;
+}
+
+/*
+ * performs (blocking) etcd_watch calls to check for
+ * changing discovery endpoint information within etcd.
+ */
+static void* etcdWatcher_run(void* data) {
+	etcd_watcher_pt watcher = (etcd_watcher_pt) data;
+
+	static char rootPath[MAX_ROOTNODE_LENGTH];
+	int highestModified = 0;
+
+	etcdWatcher_addAlreadyExistingWatchpoints(watcher->poller, &highestModified);
+	etcdWatcher_getRootPath(&rootPath[0]);
+
+	while (watcher->running) {
+		char value[MAX_VALUE_LENGTH];
+		char preValue[MAX_VALUE_LENGTH];
+		char action[MAX_ACTION_LENGTH];
+
+		if (etcd_watch(rootPath, highestModified + 1, &action[0], &preValue[0], &value[0])
== true) {
+
+			if (strcmp(action, "set") == 0) {
+				endpointDiscoveryPoller_removeDiscoveryEndpoint(watcher->poller, &preValue[0]);
+				endpointDiscoveryPoller_addDiscoveryEndpoint(watcher->poller, &value[0]);
+			} else if (strcmp(action, "delete") == 0) {
+				endpointDiscoveryPoller_removeDiscoveryEndpoint(watcher->poller, &preValue[0]);
+			} else {
+				fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Unexpected action: %s", action);
+			}
+		}
+	}
+
+	return NULL;
+}
+
+/*
+ * the ectdWatcher needs to have access to the endpoint_discovery_poller and therefore is
only
+ * allowed to be created after the endpoint_discovery_poller
+ */
+celix_status_t etcdWatcher_create(endpoint_discovery_poller_pt poller, bundle_context_pt
context,
+		etcd_watcher_pt *watcher)
+{
+	celix_status_t status = CELIX_SUCCESS;
+	char localNodePath[MAX_LOCALNODE_LENGTH];
+	char* etcd_server = NULL;
+	char* etcd_port_string = NULL;
+	int etcd_port = NULL;
+	char* endpoints = NULL;
+
+	if (poller == NULL) {
+		return CELIX_BUNDLE_EXCEPTION;
+	}
+
+	(*watcher) = calloc(1, sizeof(struct etcd_watcher));
+	if (!watcher) {
+		return CELIX_ENOMEM;
+	}
+	else
+	{
+		(*watcher)->poller = poller;
+		(*watcher)->context = context;
+	}
+
+	if ((bundleContext_getProperty(context, CFG_ETCD_SERVER_IP, &etcd_server) != CELIX_SUCCESS)
|| !etcd_server) {
+		etcd_server = DEFAULT_ETCD_SERVER_IP;
+	}
+
+	if ((bundleContext_getProperty(context, CFG_ETCD_SERVER_PORT, &etcd_port_string) !=
CELIX_SUCCESS) || !etcd_port_string) {
+		etcd_port = DEFAULT_ETCD_SERVER_PORT;
+	}
+	else
+	{
+		char* endptr = etcd_port_string;
+		errno = 0;
+		etcd_port =  strtol(etcd_port_string, &endptr, 10);
+		if (*endptr || errno != 0) {
+			etcd_port = DEFAULT_ETCD_SERVER_PORT;
+		}
+	}
+
+	if ((bundleContext_getProperty(context, DISCOVERY_POLL_ENDPOINTS, &endpoints) != CELIX_SUCCESS)
|| !endpoints) {
+		endpoints = DEFAULT_POLL_ENDPOINTS;
+	}
+
+
+	if (etcd_init(etcd_server, etcd_port) == false)
+	{
+		return CELIX_BUNDLE_EXCEPTION;
+	}
+
+	// register own framework
+	if ((status = etcdWatcher_getLocalNodePath(context, &localNodePath[0])) != CELIX_SUCCESS)
{
+		return status;
+	}
+
+	if (etcd_set(localNodePath, endpoints) == false)
+	{
+		fw_log(logger, OSGI_FRAMEWORK_LOG_WARNING, "Cannot register local discovery");
+	}
+
+
+
+	if ((status = celixThreadMutex_create(&(*watcher)->watcherLock, NULL)) != CELIX_SUCCESS)
{
+		return status;
+	}
+
+	if ((status = celixThreadMutex_lock(&(*watcher)->watcherLock)) != CELIX_SUCCESS)
{
+		return status;
+	}
+
+	if ((status = celixThread_create(&(*watcher)->watcherThread, NULL, etcdWatcher_run,
*watcher)) != CELIX_SUCCESS) {
+		return status;
+	}
+
+	(*watcher)->running = true;
+
+	if ((status = celixThreadMutex_unlock(&(*watcher)->watcherLock)) != CELIX_SUCCESS)
{
+		return status;
+	}
+
+	return status;
+}
+
+celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher) {
+	celix_status_t status = CELIX_SUCCESS;
+	char localNodePath[MAX_LOCALNODE_LENGTH];
+
+	watcher->running = false;
+
+	celixThread_join(watcher->watcherThread, NULL);
+
+	// register own framework
+	if ((status = etcdWatcher_getLocalNodePath(watcher->context, &localNodePath[0]))
!= CELIX_SUCCESS) {
+		return status;
+	}
+
+	if (etcd_del(localNodePath) == false)
+	{
+		fw_log(logger, OSGI_FRAMEWORK_LOG_WARNING, "Cannot remove local discovery registration.");
+	}
+
+	free(watcher);
+
+	return status;
+}
+

Modified: celix/trunk/remote_services/discovery_shm/private/src/discovery_activator.c
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_shm/private/src/discovery_activator.c?rev=1626250&r1=1626249&r2=1626250&view=diff
==============================================================================
--- celix/trunk/remote_services/discovery_shm/private/src/discovery_activator.c (original)
+++ celix/trunk/remote_services/discovery_shm/private/src/discovery_activator.c Fri Sep 19
15:36:24 2014
@@ -34,6 +34,7 @@
 #include "service_tracker.h"
 #include "service_registration.h"
 #include "constants.h"
+#include "celix_log.h"
 
 #include "discovery.h"
 #include "endpoint_listener.h"



Mime
View raw message