celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnol...@apache.org
Subject [2/6] celix git commit: CELIX-389: Adds Celix Publish Subscribe donation.
Date Mon, 06 Feb 2017 14:23:01 GMT
http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_serializer.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_serializer.h b/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_serializer.h
new file mode 100644
index 0000000..c1f9a4b
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_serializer.h
@@ -0,0 +1,47 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_serializer.h
+ *
+ *  \date       Dec 7, 2016
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_SERIALIZER_H
+#define PUBSUB_SERIALIZER_H
+
+#include "bundle.h"
+#include "hash_map.h"
+
+typedef struct _pubsub_message_type pubsub_message_type;
+
+int pubsubSerializer_serialize(pubsub_message_type *msgType, const void *input, void **output, int *outputLen);
+int pubsubSerializer_deserialize(pubsub_message_type *msgType, const void *input, void **output);
+
+unsigned int pubsubSerializer_hashCode(const char *string);
+version_pt pubsubSerializer_getVersion(pubsub_message_type *msgType);
+char* pubsubSerializer_getName(pubsub_message_type *msgType);
+
+void pubsubSerializer_fillMsgTypesMap(hash_map_pt msgTypesMap,bundle_pt bundle);
+void pubsubSerializer_emptyMsgTypesMap(hash_map_pt msgTypesMap);
+
+void pubsubSerializer_freeMsg(pubsub_message_type *msgType, void *msg);
+
+#endif

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_topic_info.descriptor
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_topic_info.descriptor b/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_topic_info.descriptor
new file mode 100644
index 0000000..c01a2fd
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_topic_info.descriptor
@@ -0,0 +1,10 @@
+:header
+type=interface
+name=pubsub_topic_info
+version=1.0.0
+:annotations
+:types
+:methods
+getParticipantsNumber(t)i=getParticipantsNumber(#am=handle;Pt#am=pre;*i)N
+getSubscribersNumber(t)i=getSubscribersNumber(#am=handle;Pt#am=pre;*i)N
+getPublishersNumber(t)i=getPublishersNumber(#am=handle;Pt#am=pre;*i)N

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_utils.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_utils.h b/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_utils.h
new file mode 100644
index 0000000..aff5c72
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_utils.h
@@ -0,0 +1,39 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_utils.h
+ *
+ *  \date       Sep 24, 2015
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_UTILS_H_
+#define PUBSUB_UTILS_H_
+
+#include "bundle_context.h"
+#include "array_list.h"
+
+char* pubsub_getScopeFromFilter(char* bundle_filter);
+char* pubsub_getTopicFromFilter(char* bundle_filter);
+char* pubsub_getKeysBundleDir(bundle_context_pt ctx);
+array_list_pt pubsub_getTopicsFromString(char* string);
+
+
+#endif /* PUBSUB_UTILS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_common/public/src/dyn_msg_utils.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/src/dyn_msg_utils.c b/celix-pubsub/pubsub/pubsub_common/public/src/dyn_msg_utils.c
new file mode 100644
index 0000000..8309c11
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_common/public/src/dyn_msg_utils.c
@@ -0,0 +1,156 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * dyn_msg_utils.c
+ *
+ *  \date       Nov 11, 2015
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <dirent.h>
+
+#include "utils.h"
+#include "dyn_message.h"
+
+#include "dyn_msg_utils.h"
+
+#define SYSTEM_BUNDLE_ARCHIVE_PATH 		"CELIX_FRAMEWORK_EXTENDER_PATH"
+
+static char * getMsgDescriptionDir(bundle_pt bundle);
+static void addMsgDescriptorsFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgTypesMap);
+
+
+unsigned int uintHash(const void * uintNum) {
+	return *((unsigned int*)uintNum);
+}
+
+int uintEquals(const void * uintNum, const void * toCompare) {
+	return ( (*((unsigned int*)uintNum)) == (*((unsigned int*)toCompare)) );
+}
+
+void fillMsgTypesMap(hash_map_pt msgTypesMap,bundle_pt bundle){
+
+	char *root = NULL;
+	char *metaInfPath = NULL;
+
+	root = getMsgDescriptionDir(bundle);
+	asprintf(&metaInfPath, "%s/META-INF/descriptors", root);
+
+	addMsgDescriptorsFromBundle(root, bundle, msgTypesMap);
+	addMsgDescriptorsFromBundle(metaInfPath, bundle, msgTypesMap);
+
+	free(metaInfPath);
+	if(root!=NULL){
+		free(root);
+	}
+}
+
+void emptyMsgTypesMap(hash_map_pt msgTypesMap)
+{
+	hash_map_iterator_pt iter = hashMapIterator_create(msgTypesMap);
+
+	while(hashMapIterator_hasNext(iter)){
+		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+		dynMessage_destroy( ((dyn_message_type *) hashMapEntry_getValue(entry)) );
+	}
+	hashMap_clear(msgTypesMap, true, false);
+	hashMapIterator_destroy(iter);
+}
+
+static char * getMsgDescriptionDir(bundle_pt bundle)
+{
+	char *root = NULL;
+
+	bool isSystemBundle = false;
+	bundle_isSystemBundle(bundle, &isSystemBundle);
+
+	if(isSystemBundle == true) {
+		bundle_context_pt context;
+		bundle_getContext(bundle, &context);
+
+		const char *prop = NULL;
+
+		bundleContext_getProperty(context, SYSTEM_BUNDLE_ARCHIVE_PATH, &prop);
+
+		if(prop != NULL) {
+			root = strdup(prop);
+		} else {
+			root = getcwd(NULL, 0);
+		}
+	} else {
+	    char *dir;
+		bundle_getEntry(bundle, ".", &dir);
+		root = dir;
+	}
+
+	return root;
+}
+
+
+static void addMsgDescriptorsFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgTypesMap)
+{
+	char path[128];
+	struct dirent *entry = NULL;
+	DIR *dir = opendir(root);
+
+	if(dir) {
+		entry = readdir(dir);
+	}
+
+	while (entry != NULL) {
+
+		if (strstr(entry->d_name, ".descriptor") != NULL) {
+
+			printf("DMU: Parsing entry '%s'\n", entry->d_name);
+
+			memset(path,0,128);
+			snprintf(path, 128, "%s/%s", root, entry->d_name);
+			FILE *stream = fopen(path,"r");
+
+			dyn_message_type* msgType = NULL;
+
+			int rc = dynMessage_parse(stream, &msgType);
+			if (rc == 0 && msgType!=NULL) {
+
+				char* msgName = NULL;
+				dynMessage_getName(msgType,&msgName);
+
+				if(msgName!=NULL){
+					unsigned int* msgId = malloc(sizeof(unsigned int));
+					*msgId = utils_stringHash(msgName);
+					hashMap_put(msgTypesMap,msgId,msgType);
+				}
+
+			}
+			else{
+				printf("DMU: cannot parse message from descriptor %s\n.",path);
+			}
+			fclose(stream);
+		}
+		entry = readdir(dir);
+	}
+
+	if(dir) {
+		closedir(dir);
+	}
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c b/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c
new file mode 100644
index 0000000..bbb17c3
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_common/public/src/etcd.c
@@ -0,0 +1,476 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdio.h>
+#include <stdbool.h>
+#include <string.h>
+#include <curl/curl.h>
+#include <jansson.h>
+#include "etcd.h"
+
+#define ETCD_JSON_NODE                  "node"
+#define ETCD_JSON_PREVNODE              "prevNode"
+#define ETCD_JSON_NODES                 "nodes"
+#define ETCD_JSON_ACTION                "action"
+#define ETCD_JSON_KEY                   "key"
+#define ETCD_JSON_VALUE                 "value"
+#define ETCD_JSON_DIR                   "dir"
+#define ETCD_JSON_MODIFIEDINDEX         "modifiedIndex"
+
+#define MAX_OVERHEAD_LENGTH           64
+#define DEFAULT_CURL_TIMEOUT          10
+#define DEFAULT_CURL_CONECTTIMEOUT    10
+
+typedef enum {
+	GET, PUT, DELETE
+} request_t;
+
+static const char* etcd_server;
+static int etcd_port = 0;
+
+struct MemoryStruct {
+	char *memory;
+	size_t size;
+};
+
+
+/**
+ * Static function declarations
+ */
+static int performRequest(char* url, request_t request, void* callback, void* reqData, void* repData);
+static size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp);
+/**
+ * External function definition
+ */
+
+
+/**
+ * etcd_init
+ */
+int etcd_init(const char* server, int port) {
+        etcd_server = server;
+        etcd_port = port;
+
+        return curl_global_init(CURL_GLOBAL_ALL) != 0;
+}
+
+
+/**
+ * etcd_get
+ */
+int etcd_get(const char* key, char** value, int* modifiedIndex) {
+    json_t* js_root = NULL;
+    json_t* js_node = NULL;
+    json_t* js_value = NULL;
+    json_t* js_modifiedIndex = NULL;
+    json_error_t error;
+    int res = -1;
+    struct MemoryStruct reply;
+
+    reply.memory = malloc(1); /* will be grown as needed by the realloc above */
+    reply.size = 0; /* no data at this point */
+
+    int retVal = -1;
+    char *url;
+    asprintf(&url, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key);
+    res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
+    free(url);
+    if (res == CURLE_OK) {
+        js_root = json_loads(reply.memory, 0, &error);
+
+        if (js_root != NULL) {
+            js_node = json_object_get(js_root, ETCD_JSON_NODE);
+        }
+        if (js_node != NULL) {
+            js_value = json_object_get(js_node, ETCD_JSON_VALUE);
+            js_modifiedIndex = json_object_get(js_node,
+            ETCD_JSON_MODIFIEDINDEX);
+
+            if (js_modifiedIndex != NULL && js_value != NULL) {
+                if (modifiedIndex) {
+                    *modifiedIndex = json_integer_value(js_modifiedIndex);
+                }
+                *value = strdup(json_string_value(js_value));
+                retVal = 0;
+            }
+        }
+        if (js_root != NULL) {
+            json_decref(js_root);
+        }
+    }
+
+    if (reply.memory) {
+        free(reply.memory);
+    }
+    if(retVal != 0) {
+        value = NULL;
+    }
+    return retVal;
+}
+
+
+static int etcd_get_recursive_values(json_t* js_root, etcd_key_value_callback callback, void *arg, json_int_t *mod_index) {
+    json_t *js_nodes;
+    if ((js_nodes = json_object_get(js_root, ETCD_JSON_NODES)) != NULL) {
+        // subarray
+        if (json_is_array(js_nodes)) {
+            int len = json_array_size(js_nodes);
+            for (int i = 0; i < len; i++) {
+                json_t *js_object = json_array_get(js_nodes, i);
+                json_t *js_mod_index = json_object_get(js_object, ETCD_JSON_MODIFIEDINDEX);
+
+                if(js_mod_index != NULL) {
+                    json_int_t index = json_integer_value(js_mod_index);
+                    if(*mod_index < index) {
+                        *mod_index = index;
+                    }
+                } else {
+                    printf("[ETCDLIB] Error: No INDEX found for key!\n");
+                }
+
+                if (json_object_get(js_object, ETCD_JSON_NODES)) {
+                    // node contains nodes
+                    etcd_get_recursive_values(js_object, callback, arg, mod_index);
+                } else {
+                    json_t* js_key = json_object_get(js_object, ETCD_JSON_KEY);
+                    json_t* js_value = json_object_get(js_object, ETCD_JSON_VALUE);
+
+                    if (js_key && js_value) {
+                        if (!json_object_get(js_object, ETCD_JSON_DIR)) {
+                            callback(json_string_value(js_key), json_string_value(js_value), arg);
+                        }
+                    } //else empty etcd directory, not an error.
+
+                }
+            }
+        } else {
+            printf("[ETCDLIB] Error: misformatted JSON: nodes element is not an array !!\n");
+        }
+    } else {
+        printf("[ETCDLIB] Error: nodes element not found!!\n");
+    }
+
+    return (*index > 0 ? 0 : 1);
+}
+
+/**
+ * etcd_get_directory
+ */
+int etcd_get_directory(const char* directory, etcd_key_value_callback callback, void* arg, long long* modifiedIndex) {
+    json_t* js_root = NULL;
+    json_t* js_rootnode = NULL;
+
+    json_error_t error;
+    int res;
+    struct MemoryStruct reply;
+
+    reply.memory = malloc(1); /* will be grown as needed by the realloc above */
+    reply.size = 0; /* no data at this point */
+
+    int retVal = 0;
+    char *url;
+
+    asprintf(&url, "http://%s:%d/v2/keys/%s?recursive=true", etcd_server, etcd_port, directory);
+
+    res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
+    free(url);
+
+    if (res == CURLE_OK) {
+        js_root = json_loads(reply.memory, 0, &error);
+        if (js_root != NULL) {
+            js_rootnode = json_object_get(js_root, ETCD_JSON_NODE);
+        } else {
+            retVal = -1;
+            printf("ERROR ETCD: %s in js_root not found", ETCD_JSON_NODE);
+        }
+        if (js_rootnode != NULL) {
+            *modifiedIndex = 0;
+            retVal = etcd_get_recursive_values(js_rootnode, callback, arg, (json_int_t*)modifiedIndex);
+        }
+        if (js_root != NULL) {
+            json_decref(js_root);
+        }
+    }
+
+    if (reply.memory) {
+        free(reply.memory);
+    }
+
+    return retVal;
+}
+
+/**
+ * etcd_set
+ */
+int etcd_set(const char* key, const char* value, int ttl, bool prevExist) {
+    json_error_t error;
+    json_t* js_root = NULL;
+    json_t* js_node = NULL;
+    json_t* js_value = NULL;
+    int retVal = -1;
+    char *url;
+    size_t req_len = strlen(value) + MAX_OVERHEAD_LENGTH;
+    char request[req_len];
+    char* requestPtr = request;
+    int res;
+    struct MemoryStruct reply;
+
+    /* Skip leading '/', etcd cannot handle this. */
+    while(*key == '/') {
+        key++;
+    }
+
+    reply.memory = calloc(1, 1); /* will be grown as needed by the realloc above */
+    reply.size = 0; /* no data at this point */
+
+    asprintf(&url, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key);
+
+    requestPtr += snprintf(requestPtr, req_len, "value=%s", value);
+    if (ttl > 0) {
+        requestPtr += snprintf(requestPtr, req_len-(requestPtr-request), ";ttl=%d", ttl);
+    }
+
+    if (prevExist) {
+        requestPtr += snprintf(requestPtr, req_len-(requestPtr-request), ";prevExist=true");
+    }
+    res = performRequest(url, PUT, WriteMemoryCallback, request, (void*) &reply);
+    if(url) {
+        free(url);
+    }
+
+    if (res == CURLE_OK) {
+        js_root = json_loads(reply.memory, 0, &error);
+
+        if (js_root != NULL) {
+            js_node = json_object_get(js_root, ETCD_JSON_NODE);
+        }
+        if (js_node != NULL) {
+            js_value = json_object_get(js_node, ETCD_JSON_VALUE);
+        }
+        if (js_value != NULL && json_is_string(js_value)) {
+            if(strcmp(json_string_value(js_value), value) == 0) {
+                retVal = 0;
+            }
+        }
+        if (js_root != NULL) {
+            json_decref(js_root);
+        }
+    }
+
+    if (reply.memory) {
+        free(reply.memory);
+    }
+
+    return retVal;
+}
+
+
+/**
+ * etcd_set_with_check
+ */
+int etcd_set_with_check(const char* key, const char* value, int ttl, bool always_write) {
+    char *etcd_value;
+    int result = 0;
+    if (etcd_get(key, &etcd_value, NULL) == 0) {
+        if (strcmp(etcd_value, value) != 0) {
+            printf("[ETCDLIB} WARNING: value already exists and is different\n");
+            printf("   key       = %s\n", key);
+            printf("   old value = %s\n", etcd_value);
+            printf("   new value = %s\n", value);
+            result = -1;
+        }
+        if (etcd_value) {
+            free(etcd_value);
+        }
+    }
+    if(always_write || !result) {
+        result = etcd_set(key, value, ttl, false);
+    }
+    return result;
+}
+
+
+/**
+ * etcd_watch
+ */
+int etcd_watch(const char* key, long long index, char** action, char** prevValue, char** value, char** rkey, long long* modifiedIndex) {
+    json_error_t error;
+    json_t* js_root = NULL;
+    json_t* js_node = NULL;
+    json_t* js_prevNode = NULL;
+    json_t* js_action = NULL;
+    json_t* js_value = NULL;
+    json_t* js_rkey = NULL;
+    json_t* js_prevValue = NULL;
+    json_t* js_modIndex = NULL;
+    int retVal = -1;
+    char *url = NULL;
+    int res;
+    struct MemoryStruct reply;
+
+    reply.memory = malloc(1); /* will be grown as needed by the realloc above */
+    reply.size = 0; /* no data at this point */
+
+    if (index != 0)
+        asprintf(&url, "http://%s:%d/v2/keys/%s?wait=true&recursive=true&waitIndex=%lld", etcd_server, etcd_port, key, index);
+    else
+        asprintf(&url, "http://%s:%d/v2/keys/%s?wait=true&recursive=true", etcd_server, etcd_port, key);
+    res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply);
+    if(url)
+        free(url);
+    if (res == CURLE_OK) {
+        js_root = json_loads(reply.memory, 0, &error);
+
+        if (js_root != NULL) {
+            js_action = json_object_get(js_root, ETCD_JSON_ACTION);
+            js_node = json_object_get(js_root, ETCD_JSON_NODE);
+            js_prevNode = json_object_get(js_root, ETCD_JSON_PREVNODE);
+            retVal = 0;
+        }
+        if (js_prevNode != NULL) {
+            js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
+        }
+        if (js_node != NULL) {
+            js_rkey = json_object_get(js_node, ETCD_JSON_KEY);
+            js_value = json_object_get(js_node, ETCD_JSON_VALUE);
+            js_modIndex = json_object_get(js_node, ETCD_JSON_MODIFIEDINDEX);
+        }
+        if (js_prevNode != NULL) {
+            js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE);
+        }
+        if ((prevValue != NULL) && (js_prevValue != NULL) && (json_is_string(js_prevValue))) {
+
+            *prevValue = strdup(json_string_value(js_prevValue));
+        }
+        if(modifiedIndex != NULL) {
+            if ((js_modIndex != NULL) && (json_is_integer(js_modIndex))) {
+                *modifiedIndex = json_integer_value(js_modIndex);
+            } else {
+                *modifiedIndex = index;
+            }
+        }
+        if ((rkey != NULL) && (js_rkey != NULL) && (json_is_string(js_rkey))) {
+            *rkey = strdup(json_string_value(js_rkey));
+
+        }
+        if ((action != NULL)  && (js_action != NULL)  && (json_is_string(js_action))) {
+            *action = strdup(json_string_value(js_action));
+        }
+        if ((value != NULL) && (js_value != NULL) && (json_is_string(js_value))) {
+            *value = strdup(json_string_value(js_value));
+        }
+        if (js_root != NULL) {
+            json_decref(js_root);
+        }
+
+    }
+
+    if (reply.memory) {
+        free(reply.memory);
+    }
+
+    return retVal;
+}
+
+/**
+ * etcd_del
+ */
+int etcd_del(const char* key) {
+    json_error_t error;
+    json_t* js_root = NULL;
+    json_t* js_node = NULL;
+    int retVal = -1;
+    char *url;
+    int res;
+    struct MemoryStruct reply;
+
+    reply.memory = malloc(1); /* will be grown as needed by the realloc above */
+    reply.size = 0; /* no data at this point */
+
+    asprintf(&url, "http://%s:%d/v2/keys/%s?recursive=true", etcd_server, etcd_port, key);
+    res = performRequest(url, DELETE, WriteMemoryCallback, NULL, (void*) &reply);
+    free(url);
+
+    if (res == CURLE_OK) {
+        js_root = json_loads(reply.memory, 0, &error);
+        if (js_root != NULL) {
+            js_node = json_object_get(js_root, ETCD_JSON_NODE);
+        }
+
+        if (js_node != NULL) {
+            retVal = 0;
+        }
+
+        if (js_root != NULL) {
+            json_decref(js_root);
+        }
+    }
+
+    if (reply.memory) {
+        free(reply.memory);
+    }
+
+    return retVal;
+}
+
+
+static size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp) {
+    size_t realsize = size * nmemb;
+    struct MemoryStruct *mem = (struct MemoryStruct *) userp;
+
+    mem->memory = realloc(mem->memory, mem->size + realsize + 1);
+    if (mem->memory == NULL) {
+        /* out of memory! */
+        printf("not enough memory (realloc returned NULL)\n");
+        return 0;
+    }
+
+    memcpy(&(mem->memory[mem->size]), contents, realsize);
+    mem->size += realsize;
+    mem->memory[mem->size] = 0;
+
+    return realsize;
+}
+
+static int performRequest(char* url, request_t request, void* callback, void* reqData, void* repData) {
+    CURL *curl = NULL;
+    CURLcode res = 0;
+    curl = curl_easy_init();
+    curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
+    curl_easy_setopt(curl, CURLOPT_TIMEOUT, DEFAULT_CURL_TIMEOUT);
+    curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, DEFAULT_CURL_CONECTTIMEOUT);
+    curl_easy_setopt(curl, CURLOPT_URL, url);
+    curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
+    curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, callback);
+    curl_easy_setopt(curl, CURLOPT_WRITEDATA, repData);
+
+    if (request == PUT) {
+        curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
+        curl_easy_setopt(curl, CURLOPT_POST, 1L);
+        curl_easy_setopt(curl, CURLOPT_POSTFIELDS, reqData);
+    } else if (request == DELETE) {
+        curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
+    } else if (request == GET) {
+        curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "GET");
+    }
+
+    res = curl_easy_perform(curl);
+    curl_easy_cleanup(curl);
+    return res;
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_common/public/src/log_helper.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/src/log_helper.c b/celix-pubsub/pubsub/pubsub_common/public/src/log_helper.c
new file mode 100644
index 0000000..7a63363
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_common/public/src/log_helper.c
@@ -0,0 +1,209 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * log_helper.c
+ *
+ *  \date       Nov 10, 2014
+ *  \author     <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright  Apache License, Version 2.0
+ */
+
+#include <stdlib.h>
+#include <stdarg.h>
+
+#include "bundle_context.h"
+#include "service_tracker.h"
+#include "celix_threads.h"
+#include "array_list.h"
+
+#include "celix_errno.h"
+#include "log_service.h"
+
+#include "log_helper.h"
+
+#define LOGHELPER_ENABLE_STDOUT_FALLBACK_PROPERTY_NAME 	"LOGHELPER_ENABLE_STDOUT_FALLBACK"
+
+
+struct log_helper {
+	bundle_context_pt bundleContext;
+    service_tracker_pt logServiceTracker;
+	celix_thread_mutex_t logListLock;
+	array_list_pt logServices;
+	bool stdOutFallback;
+};
+
+celix_status_t logHelper_logServiceAdded(void *handle, service_reference_pt reference, void *service);
+celix_status_t logHelper_logServiceRemoved(void *handle, service_reference_pt reference, void *service);
+
+
+celix_status_t logHelper_create(bundle_context_pt context, log_helper_pt* loghelper)
+{
+	celix_status_t status = CELIX_SUCCESS;
+
+	(*loghelper) = calloc(1, sizeof(**loghelper));
+
+	if (!(*loghelper))
+	{
+		status = CELIX_ENOMEM;
+	}
+	else
+	{
+		const char* stdOutFallbackStr = NULL;
+		(*loghelper)->bundleContext = context;
+		(*loghelper)->logServiceTracker = NULL;
+		(*loghelper)->stdOutFallback = false;
+
+		bundleContext_getProperty(context, LOGHELPER_ENABLE_STDOUT_FALLBACK_PROPERTY_NAME, &stdOutFallbackStr);
+
+		if (stdOutFallbackStr != NULL) {
+			(*loghelper)->stdOutFallback = true;
+		}
+
+		pthread_mutex_init(&(*loghelper)->logListLock, NULL);
+        arrayList_create(&(*loghelper)->logServices);
+	}
+
+	return status;
+}
+
+celix_status_t logHelper_start(log_helper_pt loghelper)
+{
+	celix_status_t status = CELIX_SUCCESS;
+	service_tracker_customizer_pt logTrackerCustomizer = NULL;
+
+	status = serviceTrackerCustomizer_create(loghelper, NULL, logHelper_logServiceAdded, NULL, logHelper_logServiceRemoved, &logTrackerCustomizer);
+
+	if (status == CELIX_SUCCESS) {
+		status = serviceTracker_create(loghelper->bundleContext, (char*) OSGI_LOGSERVICE_NAME, logTrackerCustomizer, &loghelper->logServiceTracker);
+	}
+
+	if (status == CELIX_SUCCESS) {
+		status = serviceTracker_open(loghelper->logServiceTracker);
+	}
+
+	return status;
+}
+
+
+
+celix_status_t logHelper_logServiceAdded(void *handle, service_reference_pt reference, void *service)
+{
+	log_helper_pt loghelper = handle;
+
+	pthread_mutex_lock(&loghelper->logListLock);
+	arrayList_add(loghelper->logServices, service);
+	pthread_mutex_unlock(&loghelper->logListLock);
+
+	return CELIX_SUCCESS;
+}
+
+celix_status_t logHelper_logServiceRemoved(void *handle, service_reference_pt reference, void *service)
+{
+	log_helper_pt loghelper = handle;
+
+	pthread_mutex_lock(&loghelper->logListLock);
+	arrayList_removeElement(loghelper->logServices, service);
+	pthread_mutex_unlock(&loghelper->logListLock);
+
+	return CELIX_SUCCESS;
+}
+
+
+celix_status_t logHelper_stop(log_helper_pt loghelper) {
+	celix_status_t status = CELIX_SUCCESS;
+
+    status = serviceTracker_close(loghelper->logServiceTracker);
+
+    return status;
+}
+
+celix_status_t logHelper_destroy(log_helper_pt* loghelper) {
+        celix_status_t status = CELIX_SUCCESS;
+
+        serviceTracker_destroy((*loghelper)->logServiceTracker);
+
+        pthread_mutex_lock(&(*loghelper)->logListLock);
+        arrayList_destroy((*loghelper)->logServices);
+    	pthread_mutex_unlock(&(*loghelper)->logListLock);
+
+        pthread_mutex_destroy(&(*loghelper)->logListLock);
+
+        free(*loghelper);
+        *loghelper = NULL;
+        return status;
+}
+
+
+
+
+celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* message, ... )
+{
+    celix_status_t status = CELIX_SUCCESS;
+	va_list listPointer;
+    char msg[1024];
+    msg[0] = '\0';
+    bool logged = false;
+
+	va_start(listPointer, message);
+	vsnprintf(msg, 1024, message, listPointer);
+
+	if (loghelper != NULL) {
+		pthread_mutex_lock(&loghelper->logListLock);
+
+		int i = 0;
+
+		for (; i < arrayList_size(loghelper->logServices); i++) {
+
+			log_service_pt logService = arrayList_get(loghelper->logServices, i);
+
+			if (logService != NULL) {
+				(logService->log)(logService->logger, level, msg);
+				logged = true;
+			}
+		}
+
+		pthread_mutex_unlock(&loghelper->logListLock);
+	}
+
+
+    if (!logged && loghelper->stdOutFallback) {
+        char *levelStr = NULL;
+
+        switch (level) {
+            case OSGI_LOGSERVICE_ERROR:
+                levelStr = "ERROR";
+                break;
+            case OSGI_LOGSERVICE_WARNING:
+                levelStr = "WARNING";
+                break;
+            case OSGI_LOGSERVICE_INFO:
+                levelStr = "INFO";
+                break;
+            case OSGI_LOGSERVICE_DEBUG:
+            default:
+                levelStr = "DEBUG";
+                break;
+        }
+
+        printf("%s: %s\n", levelStr, msg);
+    }
+
+
+	return status;
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_endpoint.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_endpoint.c b/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_endpoint.c
new file mode 100644
index 0000000..4af52ac
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_endpoint.c
@@ -0,0 +1,156 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * endpoint_description.c
+ *
+ *  \date       25 Jul 2014
+ *  \author     <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright  Apache License, Version 2.0
+ */
+
+#include <string.h>
+#include <stdlib.h>
+
+#include "celix_errno.h"
+#include "celix_log.h"
+
+#include "pubsub_common.h"
+#include "pubsub_endpoint.h"
+#include "constants.h"
+#include "subscriber.h"
+
+celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId, const char* endpoint, pubsub_endpoint_pt* psEp) {
+    celix_status_t status = CELIX_SUCCESS;
+    *psEp = calloc(1, sizeof(**psEp));
+
+    if (fwUUID != NULL) {
+        (*psEp)->frameworkUUID = strdup(fwUUID);
+    }
+
+    if (scope != NULL) {
+        (*psEp)->scope = strdup(scope);
+    }
+
+    if (topic != NULL) {
+        (*psEp)->topic = strdup(topic);
+    }
+
+    (*psEp)->serviceID = serviceId;
+
+    if (endpoint != NULL) {
+        (*psEp)->endpoint = strdup(endpoint);
+    }
+
+    return status;
+
+}
+
+celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference, pubsub_endpoint_pt* psEp){
+	celix_status_t status = CELIX_SUCCESS;
+
+	*psEp = calloc(1,sizeof(**psEp));
+
+	bundle_pt bundle = NULL;
+	bundle_context_pt ctxt = NULL;
+	const char* fwUUID = NULL;
+	serviceReference_getBundle(reference,&bundle);
+	bundle_getContext(bundle,&ctxt);
+	bundleContext_getProperty(ctxt,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+
+	const char* scope = NULL;
+	serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_SCOPE,&scope);
+
+	const char* topic = NULL;
+	serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_TOPIC,&topic);
+
+	const char* serviceId = NULL;
+	serviceReference_getProperty(reference,(char*)OSGI_FRAMEWORK_SERVICE_ID,&serviceId);
+
+
+	if(fwUUID!=NULL){
+		(*psEp)->frameworkUUID=strdup(fwUUID);
+	}
+
+	if(scope!=NULL){
+		(*psEp)->scope=strdup(scope);
+	} else {
+	    (*psEp)->scope=strdup(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT);
+	}
+
+	if(topic!=NULL){
+		(*psEp)->topic=strdup(topic);
+	}
+
+	if(serviceId!=NULL){
+		(*psEp)->serviceID = strtol(serviceId,NULL,10);
+	}
+
+	if (!(*psEp)->frameworkUUID || !(*psEp)->serviceID || !(*psEp)->scope || !(*psEp)->topic) {
+		fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "PUBSUB_ENDPOINT: incomplete description!.");
+		status = CELIX_BUNDLE_EXCEPTION;
+	}
+
+	return status;
+
+}
+
+celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){
+    if(psEp->frameworkUUID!=NULL){
+		free(psEp->frameworkUUID);
+		psEp->frameworkUUID = NULL;
+	}
+
+	if(psEp->scope!=NULL){
+		free(psEp->scope);
+		psEp->scope = NULL;
+	}
+
+	if(psEp->topic!=NULL){
+		free(psEp->topic);
+		psEp->topic = NULL;
+	}
+
+	if(psEp->endpoint!=NULL){
+		free(psEp->endpoint);
+		psEp->endpoint = NULL;
+	}
+
+	free(psEp);
+	return CELIX_SUCCESS;
+
+}
+
+bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){
+
+	return ((strcmp(psEp1->frameworkUUID,psEp2->frameworkUUID)==0) &&
+			(strcmp(psEp1->scope,psEp2->scope)==0) &&
+			(strcmp(psEp1->topic,psEp2->topic)==0) &&
+			(psEp1->serviceID == psEp2->serviceID) /*&&
+			((psEp1->endpoint==NULL && psEp2->endpoint==NULL)||(strcmp(psEp1->endpoint,psEp2->endpoint)==0))*/
+	);
+
+
+}
+
+char *createScopeTopicKey(const char* scope, const char* topic) {
+	char *result = NULL;
+	asprintf(&result, "%s:%s", scope, topic);
+
+	return result;
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_serializer.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_serializer.c b/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_serializer.c
new file mode 100644
index 0000000..85ef868
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_serializer.c
@@ -0,0 +1,105 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_serializer_json.c
+ *
+ *  \date       Dec 7, 2016
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include "pubsub_serializer.h"
+
+#include "utils.h"
+#include "json_serializer.h"
+#include "dyn_msg_utils.h"
+#include "dyn_type.h"
+#include "string.h"
+#include "dyn_message.h"
+#include "dyn_common.h"
+
+struct _pubsub_message_type {	/* _dyn_message_type */
+	struct namvals_head header;
+	struct namvals_head annotations;
+	struct types_head types;
+	dyn_type *msgType;
+	version_pt msgVersion;
+};
+
+int pubsubSerializer_serialize(pubsub_message_type *msgType, const void *input, void **output, int *outputLen){
+
+	int rc = 0;
+
+	dyn_type *type = NULL;
+	dynMessage_getMessageType((dyn_message_type *) msgType, &type);
+
+	char *jsonOutput = NULL;
+	rc = jsonSerializer_serialize(type, (void *) input, &jsonOutput);
+
+	*output = (void *) jsonOutput;
+	*outputLen = strlen(jsonOutput) + 1;
+
+	return rc;
+}
+
+int pubsubSerializer_deserialize(pubsub_message_type *msgType, const void *input, void **output){
+
+	int rc = 0;
+
+	dyn_type *type = NULL;
+	dynMessage_getMessageType((dyn_message_type *) msgType, &type);
+
+	void *textOutput = NULL;
+	rc = jsonSerializer_deserialize(type, (const char *) input, &textOutput);
+
+	*output = textOutput;
+
+	return rc;
+}
+
+unsigned int pubsubSerializer_hashCode(const char *string){
+	return utils_stringHash(string);
+}
+
+version_pt pubsubSerializer_getVersion(pubsub_message_type *msgType){
+	version_pt msgVersion = NULL;
+	dynMessage_getVersion((dyn_message_type *) msgType, &msgVersion);
+	return msgVersion;
+}
+
+char* pubsubSerializer_getName(pubsub_message_type *msgType){
+	char *name = NULL;
+	dynMessage_getName((dyn_message_type *) msgType, &name);
+	return name;
+}
+
+void pubsubSerializer_fillMsgTypesMap(hash_map_pt msgTypesMap,bundle_pt bundle){
+	fillMsgTypesMap(msgTypesMap, bundle);
+}
+
+void pubsubSerializer_emptyMsgTypesMap(hash_map_pt msgTypesMap){
+	emptyMsgTypesMap(msgTypesMap);
+}
+
+void pubsubSerializer_freeMsg(pubsub_message_type *msgType, void *msg){
+	dyn_type *type = NULL;
+	dynMessage_getMessageType((dyn_message_type *) msgType, &type);
+	dynType_free(type, msg);
+}
+

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_utils.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_utils.c b/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_utils.c
new file mode 100644
index 0000000..5f1b7ba
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_common/public/src/pubsub_utils.c
@@ -0,0 +1,163 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_utils.c
+ *
+ *  \date       Sep 24, 2015
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include <string.h>
+#include <stdlib.h>
+
+#include "constants.h"
+
+#include "pubsub_common.h"
+#include "publisher.h"
+#include "pubsub_utils.h"
+
+#include "array_list.h"
+#include "bundle.h"
+
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#define MAX_KEYBUNDLE_LENGTH 256
+
+char* pubsub_getScopeFromFilter(char* bundle_filter){
+
+	char* scope = NULL;
+
+	char* filter = strdup(bundle_filter);
+
+	char* oc = strstr(filter,OSGI_FRAMEWORK_OBJECTCLASS);
+	if(oc!=NULL){
+		oc+=strlen(OSGI_FRAMEWORK_OBJECTCLASS)+1;
+		if(strncmp(oc,PUBSUB_PUBLISHER_SERVICE_NAME,strlen(PUBSUB_PUBLISHER_SERVICE_NAME))==0){
+
+			char* scopes = strstr(filter,PUBSUB_PUBLISHER_SCOPE);
+			if(scopes!=NULL){
+
+				scopes+=strlen(PUBSUB_PUBLISHER_SCOPE)+1;
+				char* bottom=strchr(scopes,')');
+				*bottom='\0';
+
+				scope=strdup(scopes);
+			} else {
+			    scope=strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT);
+			}
+		}
+	}
+
+	free(filter);
+
+	return scope;
+}
+
+char* pubsub_getTopicFromFilter(char* bundle_filter){
+
+	char* topic = NULL;
+
+	char* filter = strdup(bundle_filter);
+
+	char* oc = strstr(filter,OSGI_FRAMEWORK_OBJECTCLASS);
+	if(oc!=NULL){
+		oc+=strlen(OSGI_FRAMEWORK_OBJECTCLASS)+1;
+		if(strncmp(oc,PUBSUB_PUBLISHER_SERVICE_NAME,strlen(PUBSUB_PUBLISHER_SERVICE_NAME))==0){
+
+			char* topics = strstr(filter,PUBSUB_PUBLISHER_TOPIC);
+			if(topics!=NULL){
+
+				topics+=strlen(PUBSUB_PUBLISHER_TOPIC)+1;
+				char* bottom=strchr(topics,')');
+				*bottom='\0';
+
+				topic=strdup(topics);
+
+			}
+		}
+	}
+
+	free(filter);
+
+	return topic;
+
+}
+
+array_list_pt pubsub_getTopicsFromString(char* string){
+
+	array_list_pt topic_list = NULL;
+	arrayList_create(&topic_list);
+
+	char* topics = strdup(string);
+
+	char* topic = strtok(topics,",;|# ");
+	arrayList_add(topic_list,strdup(topic));
+
+	while( (topic = strtok(NULL,",;|# ")) !=NULL){
+		arrayList_add(topic_list,strdup(topic));
+	}
+
+	free(topics);
+
+	return topic_list;
+
+}
+
+/**
+ * Loop through all bundles and look for the bundle with the keys inside.
+ * If no key bundle found, return NULL
+ *
+ * Caller is responsible for freeing the object
+ */
+char* pubsub_getKeysBundleDir(bundle_context_pt ctx)
+{
+	array_list_pt bundles = NULL;
+	bundleContext_getBundles(ctx, &bundles);
+	int nrOfBundles = arrayList_size(bundles);
+
+	char* result = NULL;
+
+	for (int i = 0; i < nrOfBundles; i++){
+		bundle_pt b = arrayList_get(bundles, i);
+		char* dir = NULL;
+		bundle_getEntry(b, ".", &dir);
+
+		char cert_dir[MAX_KEYBUNDLE_LENGTH];
+		snprintf(cert_dir, MAX_KEYBUNDLE_LENGTH, "%s/META-INF/keys", dir);
+
+		struct stat s;
+		int err = stat(cert_dir, &s);
+		if (err != -1){
+			if (S_ISDIR(s.st_mode)){
+				result = dir;
+				break;
+			}
+		}
+
+		free(dir);
+	}
+
+	arrayList_destroy(bundles);
+
+	return result;
+}
+

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_discovery/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_discovery/CMakeLists.txt b/celix-pubsub/pubsub/pubsub_discovery/CMakeLists.txt
new file mode 100644
index 0000000..bf95368
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_discovery/CMakeLists.txt
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+find_package(CURL REQUIRED)
+find_package(Jansson REQUIRED)
+
+include_directories("${CURL_INCLUDE_DIR}")
+include_directories("${JANSSON_INCLUDE_DIR}")
+include_directories("${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include")
+include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub")
+include_directories("private/include")
+include_directories("public/include")
+
+add_bundle(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_discovery_etcd"
+    VERSION "1.0.0"
+    SOURCES
+		private/src/psd_activator.c
+		private/src/pubsub_discovery_impl.c
+		private/src/etcd_common.c
+		private/src/etcd_watcher.c
+		private/src/etcd_writer.c
+		${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
+		${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/etcd.c
+)
+
+install_bundle(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery)
+	
+target_link_libraries(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery ${CURL_LIBRARIES} ${JANSSON_LIBRARIES})

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_discovery/private/include/etcd_common.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_discovery/private/include/etcd_common.h b/celix-pubsub/pubsub/pubsub_discovery/private/include/etcd_common.h
new file mode 100644
index 0000000..7a3e7b6
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_discovery/private/include/etcd_common.h
@@ -0,0 +1,28 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef ETCD_COMMON_H_
+#define ETCD_COMMON_H_
+
+#include "bundle_context.h"
+#include "celix_errno.h"
+
+celix_status_t etcdCommon_init(bundle_context_pt context);
+
+#endif /* ETCD_COMMON_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_discovery/private/include/etcd_watcher.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_discovery/private/include/etcd_watcher.h b/celix-pubsub/pubsub/pubsub_discovery/private/include/etcd_watcher.h
new file mode 100644
index 0000000..c425e60
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_discovery/private/include/etcd_watcher.h
@@ -0,0 +1,38 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef ETCD_WATCHER_H_
+#define ETCD_WATCHER_H_
+
+#include "bundle_context.h"
+#include "celix_errno.h"
+
+#include "pubsub_discovery.h"
+#include "pubsub_endpoint.h"
+
+typedef struct etcd_watcher *etcd_watcher_pt;
+
+celix_status_t etcdWatcher_create(pubsub_discovery_pt discovery,  bundle_context_pt context, const char *scope, const char* topic, etcd_watcher_pt *watcher);
+celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher);
+celix_status_t etcdWatcher_stop(etcd_watcher_pt watcher);
+
+celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt discovery, const char* key, const char* value, pubsub_endpoint_pt* pubEP);
+
+
+#endif /* ETCD_WATCHER_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_discovery/private/include/etcd_writer.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_discovery/private/include/etcd_writer.h b/celix-pubsub/pubsub/pubsub_discovery/private/include/etcd_writer.h
new file mode 100644
index 0000000..3ff98b9
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_discovery/private/include/etcd_writer.h
@@ -0,0 +1,39 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef ETCD_WRITER_H_
+#define ETCD_WRITER_H_
+
+#include "bundle_context.h"
+#include "celix_errno.h"
+
+#include "pubsub_discovery.h"
+#include "pubsub_endpoint.h"
+
+typedef struct etcd_writer *etcd_writer_pt;
+
+
+etcd_writer_pt etcdWriter_create(pubsub_discovery_pt discovery);
+void etcdWriter_destroy(etcd_writer_pt writer);
+
+celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP,bool storeEP);
+celix_status_t etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP);
+
+
+#endif /* ETCD_WRITER_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h b/celix-pubsub/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h
new file mode 100644
index 0000000..d5be8d6
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h
@@ -0,0 +1,73 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef PUBSUB_DISCOVERY_IMPL_H_
+#define PUBSUB_DISCOVERY_IMPL_H_
+
+#include "bundle_context.h"
+#include "service_reference.h"
+
+#include "etcd_watcher.h"
+#include "etcd_writer.h"
+#include "pubsub_endpoint.h"
+
+#define FREE_MEM(ptr) if(ptr) {free(ptr); ptr = NULL;}
+
+struct watcher_info {
+    etcd_watcher_pt watcher;
+    int nr_references;
+};
+
+struct pubsub_discovery {
+	bundle_context_pt context;
+
+	celix_thread_mutex_t discoveredPubsMutex;
+	hash_map_pt discoveredPubs; //<topic,List<pubsub_endpoint_pt>>
+
+	celix_thread_mutex_t listenerReferencesMutex;
+	hash_map_pt listenerReferences; //key=serviceReference, value=nop
+
+	celix_thread_mutex_t watchersMutex;
+	hash_map_pt watchers; //key = topicname, value = struct watcher_info
+
+	etcd_writer_pt writer;
+};
+
+
+celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discovery_pt* node_discovery);
+celix_status_t pubsub_discovery_destroy(pubsub_discovery_pt node_discovery);
+celix_status_t pubsub_discovery_start(pubsub_discovery_pt node_discovery);
+celix_status_t pubsub_discovery_stop(pubsub_discovery_pt node_discovery);
+
+celix_status_t pubsub_discovery_addNode(pubsub_discovery_pt node_discovery, pubsub_endpoint_pt pubEP);
+celix_status_t pubsub_discovery_removeNode(pubsub_discovery_pt node_discovery, pubsub_endpoint_pt pubEP);
+
+celix_status_t pubsub_discovery_tmPublisherAnnounceAdding(void * handle, service_reference_pt reference, void **service);
+celix_status_t pubsub_discovery_tmPublisherAnnounceAdded(void * handle, service_reference_pt reference, void * service);
+celix_status_t pubsub_discovery_tmPublisherAnnounceModified(void * handle, service_reference_pt reference, void * service);
+celix_status_t pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, service_reference_pt reference, void * service);
+
+celix_status_t pubsub_discovery_announcePublisher(void *handle, pubsub_endpoint_pt pubEP);
+celix_status_t pubsub_discovery_removePublisher(void *handle, pubsub_endpoint_pt pubEP);
+celix_status_t pubsub_discovery_interestedInTopic(void *handle, const char* scope, const char* topic);
+celix_status_t pubsub_discovery_uninterestedInTopic(void *handle, const char* scope, const char* topic);
+
+celix_status_t pubsub_discovery_informPublishersListeners(pubsub_discovery_pt discovery, pubsub_endpoint_pt endpoint, bool endpointAdded);
+
+#endif /* PUBSUB_DISCOVERY_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_common.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_common.c b/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_common.c
new file mode 100644
index 0000000..16102b0
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_common.c
@@ -0,0 +1,81 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdbool.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+
+#include "celix_log.h"
+#include "constants.h"
+
+#include "etcd.h"
+#include "etcd_watcher.h"
+
+#include "pubsub_discovery.h"
+#include "pubsub_discovery_impl.h"
+
+
+#define MAX_ROOTNODE_LENGTH		128
+#define MAX_LOCALNODE_LENGTH 	4096
+#define MAX_FIELD_LENGTH		128
+
+#define CFG_ETCD_SERVER_IP		"PUBSUB_DISCOVERY_ETCD_SERVER_IP"
+#define DEFAULT_ETCD_SERVER_IP	"127.0.0.1"
+
+#define CFG_ETCD_SERVER_PORT	"PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
+#define DEFAULT_ETCD_SERVER_PORT 2379
+
+// be careful - this should be higher than the curl timeout
+#define CFG_ETCD_TTL   "DISCOVERY_ETCD_TTL"
+#define DEFAULT_ETCD_TTL 30
+
+
+celix_status_t etcdCommon_init(bundle_context_pt context) {
+    celix_status_t status = CELIX_SUCCESS;
+    const char* etcd_server = NULL;
+    const char* etcd_port_string = NULL;
+    int etcd_port = 0;
+
+    if ((bundleContext_getProperty(context, CFG_ETCD_SERVER_IP, &etcd_server) != CELIX_SUCCESS) || !etcd_server) {
+        etcd_server = DEFAULT_ETCD_SERVER_IP;
+    }
+
+    if ((bundleContext_getProperty(context, CFG_ETCD_SERVER_PORT, &etcd_port_string) != CELIX_SUCCESS) || !etcd_port_string) {
+        etcd_port = DEFAULT_ETCD_SERVER_PORT;
+    } else {
+        char* endptr = NULL;
+        errno = 0;
+        etcd_port = strtol(etcd_port_string, &endptr, 10);
+        if (*endptr || errno != 0) {
+            etcd_port = DEFAULT_ETCD_SERVER_PORT;
+        }
+    }
+
+    printf("PSD: Using discovery HOST:PORT: %s:%i\n", etcd_server, etcd_port);
+
+    if (etcd_init(etcd_server, etcd_port) != 0) {
+        status = CELIX_BUNDLE_EXCEPTION;
+    } else {
+        status = CELIX_SUCCESS;
+    }
+
+    return status;
+}
+

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_watcher.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_watcher.c b/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_watcher.c
new file mode 100644
index 0000000..a394045
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_watcher.c
@@ -0,0 +1,292 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdbool.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+
+#include "celix_log.h"
+#include "constants.h"
+
+#include "etcd.h"
+#include "etcd_watcher.h"
+
+#include "pubsub_discovery.h"
+#include "pubsub_discovery_impl.h"
+
+
+
+#define MAX_ROOTNODE_LENGTH             128
+#define MAX_LOCALNODE_LENGTH            4096
+#define MAX_FIELD_LENGTH                128
+
+#define CFG_ETCD_ROOT_PATH              "PUBSUB_DISCOVERY_ETCD_ROOT_PATH"
+#define DEFAULT_ETCD_ROOTPATH           "pubsub/discovery"
+
+#define CFG_ETCD_SERVER_IP              "PUBSUB_DISCOVERY_ETCD_SERVER_IP"
+#define DEFAULT_ETCD_SERVER_IP          "127.0.0.1"
+
+#define CFG_ETCD_SERVER_PORT            "PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
+#define DEFAULT_ETCD_SERVER_PORT        2379
+
+// be careful - this should be higher than the curl timeout
+#define CFG_ETCD_TTL                    "DISCOVERY_ETCD_TTL"
+#define DEFAULT_ETCD_TTL                30
+
+
+struct etcd_watcher {
+	pubsub_discovery_pt pubsub_discovery;
+
+	celix_thread_mutex_t watcherLock;
+	celix_thread_t watcherThread;
+
+    char *scope;
+	char *topic;
+	volatile bool running;
+};
+
+struct etcd_writer {
+    pubsub_discovery_pt pubsub_discovery;
+    celix_thread_mutex_t localPubsLock;
+    array_list_pt localPubs;
+    volatile bool running;
+    celix_thread_t writerThread;
+};
+
+
+// note that the rootNode shouldn't have a leading slash
+static celix_status_t etcdWatcher_getTopicRootPath(bundle_context_pt context, const char *scope, const char *topic, char* rootNode, int rootNodeLen) {
+	celix_status_t status = CELIX_SUCCESS;
+	const char* rootPath = NULL;
+
+	if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) {
+	    snprintf(rootNode, rootNodeLen, "%s/%s/%s", DEFAULT_ETCD_ROOTPATH, scope, topic);
+	} else {
+        snprintf(rootNode, rootNodeLen, "%s/%s/%s", rootPath, scope, topic);
+	}
+
+	return status;
+}
+
+static celix_status_t etcdWatcher_getRootPath(bundle_context_pt context, char* rootNode) {
+    celix_status_t status = CELIX_SUCCESS;
+    const char* rootPath = NULL;
+
+    if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) {
+        strncpy(rootNode, DEFAULT_ETCD_ROOTPATH, MAX_ROOTNODE_LENGTH);
+    } else {
+        strncpy(rootNode, rootPath, MAX_ROOTNODE_LENGTH);
+    }
+
+    return status;
+}
+
+
+static void add_node(const char *key, const char *value, void* arg) {
+    pubsub_discovery_pt ps_discovery = (pubsub_discovery_pt) arg;
+    pubsub_endpoint_pt pubEP = NULL;
+    celix_status_t status = etcdWatcher_getPublisherEndpointFromKey(ps_discovery, key, value, &pubEP);
+    if(!status && pubEP) {
+        pubsub_discovery_addNode(ps_discovery, pubEP);
+    }
+}
+
+static celix_status_t etcdWatcher_addAlreadyExistingPublishers(pubsub_discovery_pt ps_discovery, const char *rootPath, long long * highestModified) {
+	celix_status_t status = CELIX_SUCCESS;
+	if(etcd_get_directory(rootPath, add_node, ps_discovery, highestModified)) {
+	    status = CELIX_ILLEGAL_ARGUMENT;
+	}
+	return status;
+}
+
+// gets everything from provided key
+celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt pubsub_discovery, const char* etcdKey, const char* etcdValue, pubsub_endpoint_pt* pubEP) {
+
+	celix_status_t status = CELIX_SUCCESS;
+
+	char rootPath[MAX_ROOTNODE_LENGTH];
+	char *expr = NULL;
+	char scope[MAX_FIELD_LENGTH];
+	char topic[MAX_FIELD_LENGTH];
+	char fwUUID[MAX_FIELD_LENGTH];
+	char serviceId[MAX_FIELD_LENGTH];
+
+	memset(rootPath,0,MAX_ROOTNODE_LENGTH);
+	memset(topic,0,MAX_FIELD_LENGTH);
+	memset(fwUUID,0,MAX_FIELD_LENGTH);
+	memset(serviceId,0,MAX_FIELD_LENGTH);
+
+	etcdWatcher_getRootPath(pubsub_discovery->context, rootPath);
+
+	asprintf(&expr, "/%s/%%[^/]/%%[^/]/%%[^/]/%%[^/].*", rootPath);
+	if(expr) {
+            int foundItems = sscanf(etcdKey, expr, scope, topic, fwUUID, serviceId);
+            free(expr);
+            if (foundItems != 4) { // Could happen when a directory is removed, just don't process this.
+                    status = CELIX_ILLEGAL_STATE;
+            }
+            else{
+                    status = pubsubEndpoint_create(fwUUID,scope,topic,strtol(serviceId,NULL,10),etcdValue,pubEP);
+            }
+	}
+	return status;
+}
+
+/*
+ * performs (blocking) etcd_watch calls to check for
+ * changing discovery endpoint information within etcd.
+ */
+static void* etcdWatcher_run(void* data) {
+    etcd_watcher_pt watcher = (etcd_watcher_pt) data;
+    time_t timeBeforeWatch = time(NULL);
+    char rootPath[MAX_ROOTNODE_LENGTH];
+    long long highestModified = 0;
+
+    pubsub_discovery_pt ps_discovery = watcher->pubsub_discovery;
+    bundle_context_pt context = ps_discovery->context;
+
+    memset(rootPath, 0, MAX_ROOTNODE_LENGTH);
+
+    //TODO: add topic to etcd key
+    etcdWatcher_getTopicRootPath(context, watcher->scope, watcher->topic, rootPath, MAX_ROOTNODE_LENGTH);
+    etcdWatcher_addAlreadyExistingPublishers(ps_discovery, rootPath, &highestModified);
+
+    while ((celixThreadMutex_lock(&watcher->watcherLock) == CELIX_SUCCESS) && watcher->running) {
+
+        char *rkey = NULL;
+        char *value = NULL;
+        char *preValue = NULL;
+        char *action = NULL;
+        long long modIndex;
+
+        celixThreadMutex_unlock(&watcher->watcherLock);
+
+        if (etcd_watch(rootPath, highestModified + 1, &action, &preValue, &value, &rkey, &modIndex) == 0 && action != NULL) {
+            pubsub_endpoint_pt pubEP = NULL;
+            if ((strcmp(action, "set") == 0) || (strcmp(action, "create") == 0)) {
+                if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == CELIX_SUCCESS) {
+                    pubsub_discovery_addNode(ps_discovery, pubEP);
+                }
+            } else if (strcmp(action, "delete") == 0) {
+                if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) == CELIX_SUCCESS) {
+                    pubsub_discovery_removeNode(ps_discovery, pubEP);
+                }
+            } else if (strcmp(action, "expire") == 0) {
+                if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) == CELIX_SUCCESS) {
+                    pubsub_discovery_removeNode(ps_discovery, pubEP);
+                }
+            } else if (strcmp(action, "update") == 0) {
+                if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == CELIX_SUCCESS) {
+                    pubsub_discovery_addNode(ps_discovery, pubEP);
+                }
+            } else {
+                fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Unexpected action: %s", action);
+            }
+            highestModified = modIndex;
+        } else if (time(NULL) - timeBeforeWatch <= (DEFAULT_ETCD_TTL / 4)) {
+            sleep(DEFAULT_ETCD_TTL / 4);
+        }
+
+        FREE_MEM(action);
+        FREE_MEM(value);
+        FREE_MEM(preValue);
+        FREE_MEM(rkey);
+
+        /* prevent busy waiting, in case etcd_watch returns false */
+
+
+        if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL / 4)) {
+            timeBeforeWatch = time(NULL);
+        }
+
+    }
+
+    if (watcher->running == false) {
+        celixThreadMutex_unlock(&watcher->watcherLock);
+    }
+
+    return NULL;
+}
+
+celix_status_t etcdWatcher_create(pubsub_discovery_pt pubsub_discovery, bundle_context_pt context, const char *scope, const char *topic, etcd_watcher_pt *watcher) {
+	celix_status_t status = CELIX_SUCCESS;
+
+
+	if (pubsub_discovery == NULL) {
+		return CELIX_BUNDLE_EXCEPTION;
+	}
+
+	(*watcher) = calloc(1, sizeof(struct etcd_watcher));
+
+	if(*watcher == NULL){
+		return CELIX_ENOMEM;
+	}
+
+	(*watcher)->pubsub_discovery = pubsub_discovery;
+	(*watcher)->scope = strdup(scope);
+	(*watcher)->topic = strdup(topic);
+
+
+    celixThreadMutex_create(&(*watcher)->watcherLock, NULL);
+
+    celixThreadMutex_lock(&(*watcher)->watcherLock);
+
+    if ((status = celixThread_create(&(*watcher)->watcherThread, NULL, etcdWatcher_run, *watcher)) != CELIX_SUCCESS) {
+        return status;
+    }
+    (*watcher)->running = true;
+
+    celixThreadMutex_unlock(&(*watcher)->watcherLock);
+
+
+	return status;
+}
+
+celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher) {
+
+	celix_status_t status = CELIX_SUCCESS;
+
+	char rootPath[MAX_ROOTNODE_LENGTH];
+	etcdWatcher_getTopicRootPath(watcher->pubsub_discovery->context, watcher->scope, watcher->topic, rootPath, MAX_ROOTNODE_LENGTH);
+	celixThreadMutex_destroy(&(watcher->watcherLock));
+
+	free(watcher->scope);
+	free(watcher->topic);
+	free(watcher);
+
+	return status;
+}
+
+celix_status_t etcdWatcher_stop(etcd_watcher_pt watcher){
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&(watcher->watcherLock));
+	watcher->running = false;
+	celixThreadMutex_unlock(&(watcher->watcherLock));
+
+	watcher->running = false;
+
+	celixThread_join(watcher->watcherThread, NULL);
+
+	return status;
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_writer.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_writer.c b/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_writer.c
new file mode 100644
index 0000000..687d802
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_discovery/private/src/etcd_writer.c
@@ -0,0 +1,189 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdbool.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+
+#include "celix_log.h"
+#include "constants.h"
+
+#include "etcd.h"
+#include "etcd_writer.h"
+
+#include "pubsub_discovery.h"
+#include "pubsub_discovery_impl.h"
+
+#define MAX_ROOTNODE_LENGTH		128
+
+#define CFG_ETCD_ROOT_PATH		"PUBSUB_DISCOVERY_ETCD_ROOT_PATH"
+#define DEFAULT_ETCD_ROOTPATH	"pubsub/discovery"
+
+#define CFG_ETCD_SERVER_IP		"PUBSUB_DISCOVERY_ETCD_SERVER_IP"
+#define DEFAULT_ETCD_SERVER_IP	"127.0.0.1"
+
+#define CFG_ETCD_SERVER_PORT	"PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
+#define DEFAULT_ETCD_SERVER_PORT 2379
+
+// be careful - this should be higher than the curl timeout
+#define CFG_ETCD_TTL   "DISCOVERY_ETCD_TTL"
+#define DEFAULT_ETCD_TTL 30
+
+struct etcd_writer {
+    pubsub_discovery_pt pubsub_discovery;
+    celix_thread_mutex_t localPubsLock;
+    array_list_pt localPubs;
+    volatile bool running;
+    celix_thread_t writerThread;
+};
+
+
+static const char* etcdWriter_getRootPath(bundle_context_pt context);
+static void* etcdWriter_run(void* data);
+
+
+etcd_writer_pt etcdWriter_create(pubsub_discovery_pt disc) {
+    etcd_writer_pt writer = calloc(1, sizeof(*writer));
+    if(writer) {
+        celixThreadMutex_create(&writer->localPubsLock, NULL);
+        arrayList_create(&writer->localPubs);
+        writer->pubsub_discovery = disc;
+        writer->running = true;
+        celixThread_create(&writer->writerThread, NULL, etcdWriter_run, writer);
+    }
+    return writer;
+}
+
+void etcdWriter_destroy(etcd_writer_pt writer) {
+    char dir[MAX_ROOTNODE_LENGTH];
+    const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context);
+
+    writer->running = false;
+    celixThread_join(writer->writerThread, NULL);
+
+    celixThreadMutex_lock(&writer->localPubsLock);
+    for(int i = 0; i < arrayList_size(writer->localPubs); i++) {
+        pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(writer->localPubs,i);
+        memset(dir,0,MAX_ROOTNODE_LENGTH);
+        snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID);
+        etcd_del(dir);
+        pubsubEndpoint_destroy(pubEP);
+    }
+    arrayList_destroy(writer->localPubs);
+
+    celixThreadMutex_unlock(&writer->localPubsLock);
+    celixThreadMutex_destroy(&(writer->localPubsLock));
+
+    free(writer);
+}
+
+celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP, bool storeEP){
+	celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+
+	if(storeEP){
+		const char *fwUUID = NULL;
+		bundleContext_getProperty(writer->pubsub_discovery->context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
+		if(fwUUID && strcmp(pubEP->frameworkUUID, fwUUID) == 0) {
+	            celixThreadMutex_lock(&writer->localPubsLock);
+		    pubsub_endpoint_pt p = NULL;
+		    pubsubEndpoint_create(pubEP->frameworkUUID,pubEP->scope,pubEP->topic,pubEP->serviceID,pubEP->endpoint,&p);
+		    arrayList_add(writer->localPubs,p);
+	            celixThreadMutex_unlock(&writer->localPubsLock);
+		}
+	}
+
+	char *key;
+
+	const char* ttlStr = NULL;
+	int ttl = 0;
+
+	// determine ttl
+	if ((bundleContext_getProperty(writer->pubsub_discovery->context, CFG_ETCD_TTL, &ttlStr) != CELIX_SUCCESS) || !ttlStr) {
+		ttl = DEFAULT_ETCD_TTL;
+	} else {
+		char* endptr = NULL;
+		errno = 0;
+		ttl = strtol(ttlStr, &endptr, 10);
+		if (*endptr || errno != 0) {
+			ttl = DEFAULT_ETCD_TTL;
+		}
+	}
+
+	const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context);
+
+	asprintf(&key,"%s/%s/%s/%s/%ld",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID,pubEP->serviceID);
+
+	if(!etcd_set(key,pubEP->endpoint,ttl,false)){
+		status = CELIX_ILLEGAL_ARGUMENT;
+	}
+	FREE_MEM(key);
+	return status;
+}
+
+celix_status_t etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP) {
+    celix_status_t status = CELIX_SUCCESS;
+    char *key = NULL;
+
+    const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context);
+
+    asprintf(&key, "%s/%s/%s/%s/%ld", rootPath, pubEP->scope, pubEP->topic, pubEP->frameworkUUID, pubEP->serviceID);
+
+    celixThreadMutex_lock(&writer->localPubsLock);
+    for (unsigned int i = 0; i < arrayList_size(writer->localPubs); i++) {
+        pubsub_endpoint_pt ep = arrayList_get(writer->localPubs, i);
+        if (pubsubEndpoint_equals(ep, pubEP)) {
+            arrayList_remove(writer->localPubs, i);
+            pubsubEndpoint_destroy(ep);
+            break;
+        }
+    }
+    celixThreadMutex_unlock(&writer->localPubsLock);
+
+    if (etcd_del(key)) {
+        printf("Failed to remove key %s from ETCD\n",key);
+        status = CELIX_ILLEGAL_ARGUMENT;
+    }
+    FREE_MEM(key);
+    return status;
+}
+
+static void* etcdWriter_run(void* data) {
+    etcd_writer_pt writer = (etcd_writer_pt)data;
+    while(writer->running) {
+          celixThreadMutex_lock(&writer->localPubsLock);
+          for(int i=0; i < arrayList_size(writer->localPubs); i++) {
+              etcdWriter_addPublisherEndpoint(writer,(pubsub_endpoint_pt)arrayList_get(writer->localPubs,i),false);
+          }
+          celixThreadMutex_unlock(&writer->localPubsLock);
+          sleep(DEFAULT_ETCD_TTL / 2);
+    }
+
+    return NULL;
+}
+
+static const char* etcdWriter_getRootPath(bundle_context_pt context) {
+    const char* rootPath = NULL;
+    bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath);
+    if(rootPath == NULL) {
+        rootPath = DEFAULT_ETCD_ROOTPATH;
+    }
+    return rootPath;
+}
+

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_discovery/private/src/psd_activator.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_discovery/private/src/psd_activator.c b/celix-pubsub/pubsub/pubsub_discovery/private/src/psd_activator.c
new file mode 100644
index 0000000..afbe282
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_discovery/private/src/psd_activator.c
@@ -0,0 +1,171 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "bundle_activator.h"
+#include "service_tracker.h"
+#include "service_registration.h"
+#include "constants.h"
+#include "celix_log.h"
+
+#include "pubsub_common.h"
+#include "publisher_endpoint_announce.h"
+#include "pubsub_discovery.h"
+#include "pubsub_discovery_impl.h"
+
+struct activator {
+	bundle_context_pt context;
+	pubsub_discovery_pt pubsub_discovery;
+
+	service_tracker_pt pstmPublishersTracker;
+
+	publisher_endpoint_announce_pt publisherEPAnnounce;
+	service_registration_pt publisherEPAnnounceService;
+};
+
+static celix_status_t createTMPublisherAnnounceTracker(struct activator *activator, service_tracker_pt *tracker) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	service_tracker_customizer_pt customizer = NULL;
+
+	status = serviceTrackerCustomizer_create(activator->pubsub_discovery,
+			pubsub_discovery_tmPublisherAnnounceAdding,
+			pubsub_discovery_tmPublisherAnnounceAdded,
+			pubsub_discovery_tmPublisherAnnounceModified,
+			pubsub_discovery_tmPublisherAnnounceRemoved,
+			&customizer);
+
+	if (status == CELIX_SUCCESS) {
+		status = serviceTracker_create(activator->context, (char *) PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE, customizer, tracker);
+	}
+
+	return status;
+}
+
+celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	struct activator* activator = calloc(1, sizeof(*activator));
+
+	if (activator) {
+		activator->context = context;
+		activator->pstmPublishersTracker = NULL;
+		activator->publisherEPAnnounce = NULL;
+		activator->publisherEPAnnounceService = NULL;
+
+		status = pubsub_discovery_create(context, &activator->pubsub_discovery);
+
+		if (status == CELIX_SUCCESS) {
+			status = createTMPublisherAnnounceTracker(activator, &(activator->pstmPublishersTracker));
+		}
+
+		if (status == CELIX_SUCCESS) {
+			*userData = activator;
+		} else {
+			free(activator);
+		}
+	} else {
+		status = CELIX_ENOMEM;
+	}
+
+	return status;
+
+}
+
+celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	struct activator *activator = userData;
+
+	publisher_endpoint_announce_pt pubEPAnnouncer = calloc(1, sizeof(*pubEPAnnouncer));
+
+	if (pubEPAnnouncer) {
+
+		pubEPAnnouncer->handle = activator->pubsub_discovery;
+		pubEPAnnouncer->announcePublisher = pubsub_discovery_announcePublisher;
+		pubEPAnnouncer->removePublisher = pubsub_discovery_removePublisher;
+		pubEPAnnouncer->interestedInTopic = pubsub_discovery_interestedInTopic;
+		pubEPAnnouncer->uninterestedInTopic = pubsub_discovery_uninterestedInTopic;
+		activator->publisherEPAnnounce = pubEPAnnouncer;
+
+		properties_pt props = properties_create();
+		properties_set(props, "PUBSUB_DISCOVERY", "true");
+
+		// pubsub_discovery_start needs to be first to initalize the propert etcd_watcher values
+		status = pubsub_discovery_start(activator->pubsub_discovery);
+
+		if (status == CELIX_SUCCESS) {
+			status = serviceTracker_open(activator->pstmPublishersTracker);
+		}
+
+		if (status == CELIX_SUCCESS) {
+			status = bundleContext_registerService(context, (char *) PUBSUB_DISCOVERY_SERVICE, pubEPAnnouncer, props, &activator->publisherEPAnnounceService);
+		}
+
+
+	}
+	else{
+		status = CELIX_ENOMEM;
+	}
+
+	if(status!=CELIX_SUCCESS && pubEPAnnouncer!=NULL){
+		free(pubEPAnnouncer);
+	}
+
+
+	return status;
+}
+
+celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = userData;
+
+	status += pubsub_discovery_stop(activator->pubsub_discovery);
+
+	status += serviceTracker_close(activator->pstmPublishersTracker);
+
+	status += serviceRegistration_unregister(activator->publisherEPAnnounceService);
+
+	if (status == CELIX_SUCCESS) {
+		free(activator->publisherEPAnnounce);
+	}
+
+	return status;
+}
+
+celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = userData;
+
+	status += serviceTracker_destroy(activator->pstmPublishersTracker);
+	status += pubsub_discovery_destroy(activator->pubsub_discovery);
+
+	activator->publisherEPAnnounce = NULL;
+	activator->publisherEPAnnounceService = NULL;
+	activator->pstmPublishersTracker = NULL;
+	activator->pubsub_discovery = NULL;
+	activator->context = NULL;
+
+	free(activator);
+
+	return status;
+}


Mime
View raw message