celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnol...@apache.org
Subject [1/6] celix git commit: CELIX-389: Adds Celix Publish Subscribe donation.
Date Mon, 06 Feb 2017 14:23:00 GMT
Repository: celix
Updated Branches:
  refs/heads/develop 3e678523e -> f9a5fb11e


http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c b/celix-pubsub/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
new file mode 100644
index 0000000..1b6aca9
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
@@ -0,0 +1,468 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <stdbool.h>
+#include <netdb.h>
+#include <netinet/in.h>
+
+#include "constants.h"
+#include "celix_threads.h"
+#include "bundle_context.h"
+#include "array_list.h"
+#include "utils.h"
+#include "celix_errno.h"
+#include "filter.h"
+#include "service_reference.h"
+#include "service_registration.h"
+
+#include "publisher_endpoint_announce.h"
+#include "etcd_common.h"
+#include "etcd_watcher.h"
+#include "etcd_writer.h"
+#include "pubsub_endpoint.h"
+#include "pubsub_discovery_impl.h"
+
+/* Discovery activator functions */
+celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discovery_pt *ps_discovery) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	*ps_discovery = calloc(1, sizeof(**ps_discovery));
+
+	if (*ps_discovery == NULL) {
+		status = CELIX_ENOMEM;
+	}
+	else{
+		(*ps_discovery)->context = context;
+		(*ps_discovery)->discoveredPubs = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+		(*ps_discovery)->listenerReferences = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL);
+		(*ps_discovery)->watchers = hashMap_create(utils_stringHash,NULL,utils_stringEquals, NULL);
+		celixThreadMutex_create(&(*ps_discovery)->listenerReferencesMutex, NULL);
+		celixThreadMutex_create(&(*ps_discovery)->discoveredPubsMutex, NULL);
+		celixThreadMutex_create(&(*ps_discovery)->watchersMutex, NULL);
+	}
+
+	return status;
+}
+
+celix_status_t pubsub_discovery_destroy(pubsub_discovery_pt ps_discovery) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&ps_discovery->discoveredPubsMutex);
+
+	hash_map_iterator_pt iter = hashMapIterator_create(ps_discovery->discoveredPubs);
+
+	while (hashMapIterator_hasNext(iter)) {
+		array_list_pt pubEP_list = (array_list_pt) hashMapIterator_nextValue(iter);
+
+		for(int i=0; i < arrayList_size(pubEP_list); i++) {
+			pubsubEndpoint_destroy(((pubsub_endpoint_pt)arrayList_get(pubEP_list,i)));
+		}
+		arrayList_destroy(pubEP_list);
+	}
+
+	hashMapIterator_destroy(iter);
+
+	hashMap_destroy(ps_discovery->discoveredPubs, true, false);
+	ps_discovery->discoveredPubs = NULL;
+
+	celixThreadMutex_unlock(&ps_discovery->discoveredPubsMutex);
+
+	celixThreadMutex_destroy(&ps_discovery->discoveredPubsMutex);
+
+
+	celixThreadMutex_lock(&ps_discovery->listenerReferencesMutex);
+
+	hashMap_destroy(ps_discovery->listenerReferences, false, false);
+	ps_discovery->listenerReferences = NULL;
+
+	celixThreadMutex_unlock(&ps_discovery->listenerReferencesMutex);
+
+	celixThreadMutex_destroy(&ps_discovery->listenerReferencesMutex);
+
+	free(ps_discovery);
+
+	return status;
+}
+
+celix_status_t pubsub_discovery_start(pubsub_discovery_pt ps_discovery) {
+    celix_status_t status = CELIX_SUCCESS;
+    status = etcdCommon_init(ps_discovery->context);
+    ps_discovery->writer = etcdWriter_create(ps_discovery);
+
+    return status;
+}
+
+celix_status_t pubsub_discovery_stop(pubsub_discovery_pt ps_discovery) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    const char* fwUUID = NULL;
+
+    bundleContext_getProperty(ps_discovery->context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
+    if (fwUUID == NULL) {
+        printf("PSD: Cannot retrieve fwUUID.\n");
+        return CELIX_INVALID_BUNDLE_CONTEXT;
+    }
+
+    celixThreadMutex_lock(&ps_discovery->watchersMutex);
+
+    hash_map_iterator_pt iter = hashMapIterator_create(ps_discovery->watchers);
+    while (hashMapIterator_hasNext(iter)) {
+        struct watcher_info * wi = hashMapIterator_nextValue(iter);
+        etcdWatcher_stop(wi->watcher);
+    }
+    hashMapIterator_destroy(iter);
+    celixThreadMutex_unlock(&ps_discovery->watchersMutex);
+
+    celixThreadMutex_lock(&ps_discovery->discoveredPubsMutex);
+
+    /* Unexport all publishers for the local framework, and also delete from ETCD publisher belonging to the local framework */
+
+    iter = hashMapIterator_create(ps_discovery->discoveredPubs);
+    while (hashMapIterator_hasNext(iter)) {
+        array_list_pt pubEP_list = (array_list_pt) hashMapIterator_nextValue(iter);
+
+        int i;
+        for (i = 0; i < arrayList_size(pubEP_list); i++) {
+            pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt) arrayList_get(pubEP_list, i);
+            if (strcmp(pubEP->frameworkUUID, fwUUID) == 0) {
+                etcdWriter_deletePublisherEndpoint(ps_discovery->writer, pubEP);
+            } else {
+                pubsub_discovery_informPublishersListeners(ps_discovery, pubEP, false);
+                arrayList_remove(pubEP_list, i);
+                pubsubEndpoint_destroy(pubEP);
+                i--;
+            }
+        }
+    }
+
+    hashMapIterator_destroy(iter);
+
+    celixThreadMutex_unlock(&ps_discovery->discoveredPubsMutex);
+    etcdWriter_destroy(ps_discovery->writer);
+
+    iter = hashMapIterator_create(ps_discovery->watchers);
+    while (hashMapIterator_hasNext(iter)) {
+        struct watcher_info * wi = hashMapIterator_nextValue(iter);
+        etcdWatcher_destroy(wi->watcher);
+    }
+    hashMapIterator_destroy(iter);
+    hashMap_destroy(ps_discovery->watchers, true, true);
+    celixThreadMutex_unlock(&ps_discovery->watchersMutex);
+    return status;
+}
+
+/* Functions called by the etcd_watcher */
+
+celix_status_t pubsub_discovery_addNode(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP) {
+	celix_status_t status = CELIX_SUCCESS;
+	bool inform=false;
+	celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
+
+	char *pubs_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
+	array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pubs_key);
+	if(pubEP_list==NULL){
+		arrayList_create(&pubEP_list);
+		arrayList_add(pubEP_list,pubEP);
+		hashMap_put(pubsub_discovery->discoveredPubs,strdup(pubs_key),pubEP_list);
+		inform=true;
+	}
+	else{
+		int i;
+		bool found = false;
+		for(i=0;i<arrayList_size(pubEP_list) && !found;i++){
+			found = pubsubEndpoint_equals(pubEP,(pubsub_endpoint_pt)arrayList_get(pubEP_list,i));
+		}
+		if(found){
+			pubsubEndpoint_destroy(pubEP);
+		}
+		else{
+			arrayList_add(pubEP_list,pubEP);
+			inform=true;
+    	}
+	}
+	free(pubs_key);
+
+	celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
+
+	if(inform){
+	    status = pubsub_discovery_informPublishersListeners(pubsub_discovery,pubEP,true);
+	}
+
+	return status;
+}
+
+celix_status_t pubsub_discovery_removeNode(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP) {
+    celix_status_t status = CELIX_SUCCESS;
+    pubsub_endpoint_pt p = NULL;
+    bool found = false;
+
+    celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
+    char *pubs_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
+    array_list_pt pubEP_list = (array_list_pt) hashMap_get(pubsub_discovery->discoveredPubs, pubs_key);
+    free(pubs_key);
+    if (pubEP_list == NULL) {
+        printf("PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n", pubEP->topic);
+        status = CELIX_ILLEGAL_STATE;
+    } else {
+        int i;
+
+        for (i = 0; !found && i < arrayList_size(pubEP_list); i++) {
+            p = arrayList_get(pubEP_list, i);
+            found = pubsubEndpoint_equals(pubEP, p);
+            if (found) {
+                arrayList_remove(pubEP_list, i);
+                pubsubEndpoint_destroy(p);
+            }
+        }
+    }
+
+    celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
+    if (found) {
+        status = pubsub_discovery_informPublishersListeners(pubsub_discovery, pubEP, false);
+    }
+    pubsubEndpoint_destroy(pubEP);
+
+    return status;
+}
+
+/* Callback to the pubsub_topology_manager */
+celix_status_t pubsub_discovery_informPublishersListeners(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP, bool epAdded) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	// Inform listeners of new publisher endpoint
+	celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex);
+
+	if (pubsub_discovery->listenerReferences != NULL) {
+		hash_map_iterator_pt iter = hashMapIterator_create(pubsub_discovery->listenerReferences);
+		while (hashMapIterator_hasNext(iter)) {
+			service_reference_pt reference = hashMapIterator_nextKey(iter);
+
+			publisher_endpoint_announce_pt listener = NULL;
+
+			bundleContext_getService(pubsub_discovery->context, reference, (void**) &listener);
+            if (epAdded) {
+                listener->announcePublisher(listener->handle, pubEP);
+            } else {
+                listener->removePublisher(listener->handle, pubEP);
+            }
+            bundleContext_ungetService(pubsub_discovery->context, reference, NULL);
+		}
+		hashMapIterator_destroy(iter);
+	}
+
+	celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
+
+	return status;
+}
+
+
+/* Service's functions implementation */
+celix_status_t pubsub_discovery_announcePublisher(void *handle, pubsub_endpoint_pt pubEP) {
+	celix_status_t status = CELIX_SUCCESS;
+	printf("pubsub_discovery_announcePublisher : %s / %s\n", pubEP->topic, pubEP->endpoint);
+	pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
+
+	celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
+
+	char *pub_key = createScopeTopicKey(pubEP->scope,pubEP->topic);
+	array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key);
+
+	if(pubEP_list==NULL){
+		arrayList_create(&pubEP_list);
+		hashMap_put(pubsub_discovery->discoveredPubs,strdup(pub_key),pubEP_list);
+	}
+	free(pub_key);
+	pubsub_endpoint_pt p = NULL;
+	pubsubEndpoint_create(pubEP->frameworkUUID,pubEP->scope,pubEP->topic,pubEP->serviceID,pubEP->endpoint,&p);
+
+	arrayList_add(pubEP_list,p);
+
+	status = etcdWriter_addPublisherEndpoint(pubsub_discovery->writer,p,true);
+
+	celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
+
+	return status;
+}
+
+celix_status_t pubsub_discovery_removePublisher(void *handle, pubsub_endpoint_pt pubEP) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
+
+	celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
+
+	char *pub_key = createScopeTopicKey(pubEP->scope,pubEP->topic);
+	array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key);
+	free(pub_key);
+	if(pubEP_list==NULL){
+		printf("PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n",pubEP->topic);
+		return CELIX_ILLEGAL_STATE;
+	}
+	else{
+
+		int i;
+		bool found = false;
+		pubsub_endpoint_pt p = NULL;
+
+		for(i=0;!found && i<arrayList_size(pubEP_list);i++){
+			p = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
+			found = pubsubEndpoint_equals(pubEP,p);
+		}
+
+		if(!found){
+			printf("PSD: Trying to remove a not existing endpoint. Something is not consistent.\n");
+			status = CELIX_ILLEGAL_STATE;
+		}
+		else{
+
+			arrayList_removeElement(pubEP_list,p);
+
+			status = etcdWriter_deletePublisherEndpoint(pubsub_discovery->writer,p);
+
+			pubsubEndpoint_destroy(p);
+		}
+	}
+
+	celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
+
+	return status;
+}
+
+celix_status_t pubsub_discovery_interestedInTopic(void *handle, const char* scope, const char* topic) {
+    pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
+
+    char *scope_topic_key = createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&pubsub_discovery->watchersMutex);
+    struct watcher_info * wi = hashMap_get(pubsub_discovery->watchers, scope_topic_key);
+    if(wi) {
+        wi->nr_references++;
+        free(scope_topic_key);
+    } else {
+        wi = calloc(1, sizeof(*wi));
+        etcdWatcher_create(pubsub_discovery, pubsub_discovery->context, scope, topic, &wi->watcher);
+        wi->nr_references = 1;
+        hashMap_put(pubsub_discovery->watchers, scope_topic_key, wi);
+    }
+
+    celixThreadMutex_unlock(&pubsub_discovery->watchersMutex);
+
+    return CELIX_SUCCESS;
+}
+
+celix_status_t pubsub_discovery_uninterestedInTopic(void *handle, const char* scope, const char* topic) {
+    pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
+
+    char *scope_topic_key = createScopeTopicKey(scope, topic);
+    celixThreadMutex_lock(&pubsub_discovery->watchersMutex);
+
+    hash_map_entry_pt entry =  hashMap_getEntry(pubsub_discovery->watchers, scope_topic_key);
+    if(entry) {
+        struct watcher_info * wi = hashMapEntry_getValue(entry);
+        wi->nr_references--;
+        if(wi->nr_references == 0) {
+            char *key = hashMapEntry_getKey(entry);
+            hashMap_remove(pubsub_discovery->watchers, scope_topic_key);
+            free(key);
+            free(scope_topic_key);
+            etcdWatcher_stop(wi->watcher);
+            etcdWatcher_destroy(wi->watcher);
+            free(wi);
+        }
+    } else {
+        fprintf(stderr, "[DISC] Inconsistency error: Removing unknown topic %s\n", topic);
+    }
+    celixThreadMutex_unlock(&pubsub_discovery->watchersMutex);
+    return CELIX_SUCCESS;
+}
+
+/* pubsub_topology_manager tracker callbacks */
+
+celix_status_t pubsub_discovery_tmPublisherAnnounceAdding(void * handle, service_reference_pt reference, void **service) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt)handle;
+
+	status = bundleContext_getService(pubsub_discovery->context, reference, service);
+
+	return status;
+}
+
+celix_status_t pubsub_discovery_tmPublisherAnnounceAdded(void * handle, service_reference_pt reference, void * service) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt)handle;
+	publisher_endpoint_announce_pt listener = (publisher_endpoint_announce_pt)service;
+
+	celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
+	celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex);
+
+	/* Notify the PSTM about discovered publisher endpoints */
+	hash_map_iterator_pt iter = hashMapIterator_create(pubsub_discovery->discoveredPubs);
+	while(hashMapIterator_hasNext(iter)){
+		array_list_pt pubEP_list = (array_list_pt)hashMapIterator_nextValue(iter);
+		int i;
+		for(i=0;i<arrayList_size(pubEP_list);i++){
+			pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
+			status += listener->announcePublisher(listener->handle, pubEP);
+		}
+	}
+
+	hashMapIterator_destroy(iter);
+
+	hashMap_put(pubsub_discovery->listenerReferences, reference, NULL);
+
+	celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
+	celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
+
+	printf("PSD: pubsub_tm_announce_publisher added.\n");
+
+	return status;
+}
+
+celix_status_t pubsub_discovery_tmPublisherAnnounceModified(void * handle, service_reference_pt reference, void * service) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	status = pubsub_discovery_tmPublisherAnnounceRemoved(handle, reference, service);
+	if (status == CELIX_SUCCESS) {
+		status = pubsub_discovery_tmPublisherAnnounceAdded(handle, reference, service);
+	}
+
+	return status;
+}
+
+celix_status_t pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, service_reference_pt reference, void * service) {
+	celix_status_t status = CELIX_SUCCESS;
+	pubsub_discovery_pt pubsub_discovery = handle;
+
+	celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex);
+
+	if (pubsub_discovery->listenerReferences != NULL) {
+		if (hashMap_remove(pubsub_discovery->listenerReferences, reference)) {
+			printf("PSD: pubsub_tm_announce_publisher removed.\n");
+		}
+	}
+	celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
+
+	return status;
+}
+

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_discovery/public/include/pubsub_discovery.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_discovery/public/include/pubsub_discovery.h b/celix-pubsub/pubsub/pubsub_discovery/public/include/pubsub_discovery.h
new file mode 100644
index 0000000..f77905a
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_discovery/public/include/pubsub_discovery.h
@@ -0,0 +1,26 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef PUBSUB_DISCOVERY_H_
+#define PUBSUB_DISCOVERY_H_
+
+typedef struct pubsub_discovery *pubsub_discovery_pt;
+
+
+#endif /* PUBSUB_DISCOVERY_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_topology_manager/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_topology_manager/CMakeLists.txt b/celix-pubsub/pubsub/pubsub_topology_manager/CMakeLists.txt
new file mode 100644
index 0000000..02540f3
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_topology_manager/CMakeLists.txt
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+include_directories("${PROJECT_SOURCE_DIR}/log_service/public/include")
+include_directories("${PROJECT_SOURCE_DIR}/remote_services/remote_service_admin/public/include")
+include_directories("${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include")
+include_directories("${PROJECT_SOURCE_DIR}/pubsub/pubsub_admin/public/include")
+include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub")
+include_directories("private/include")
+include_directories("public/include")
+
+add_bundle(org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
+    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_topology_manager"
+    VERSION "1.0.0"
+    SOURCES
+    	private/src/pstm_activator.c
+    	private/src/pubsub_topology_manager.c
+    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
+    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
+    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c	
+)
+
+bundle_files(org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
+   ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include/pubsub_topic_info.descriptor
+    DESTINATION "META-INF/"
+)
+
+install_bundle(org.apache.celix.pubsub_topology_manager.PubSubTopologyManager)
+

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h b/celix-pubsub/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h
new file mode 100644
index 0000000..2e940aa
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h
@@ -0,0 +1,86 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_topology_manager.h
+ *
+ *  \date       Sep 29, 2011
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_TOPOLOGY_MANAGER_H_
+#define PUBSUB_TOPOLOGY_MANAGER_H_
+
+#include "endpoint_listener.h"
+#include "service_reference.h"
+#include "bundle_context.h"
+#include "log_helper.h"
+
+#include "pubsub_common.h"
+#include "pubsub_endpoint.h"
+#include "publisher.h"
+#include "subscriber.h"
+
+
+struct pubsub_topology_manager {
+	bundle_context_pt context;
+
+	celix_thread_mutex_t psaListLock;
+	array_list_pt psaList;
+
+	celix_thread_mutex_t discoveryListLock;
+	hash_map_pt discoveryList; //<serviceReference,NULL>
+
+	celix_thread_mutex_t publicationsLock;
+	hash_map_pt publications; //<topic(string),list<pubsub_ep>>
+
+	celix_thread_mutex_t subscriptionsLock;
+	hash_map_pt subscriptions; //<topic(string),list<pubsub_ep>>
+
+	log_helper_pt loghelper;
+};
+
+typedef struct pubsub_topology_manager *pubsub_topology_manager_pt;
+
+celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_pt *manager);
+celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager);
+celix_status_t pubsub_topologyManager_closeImports(pubsub_topology_manager_pt manager);
+
+celix_status_t pubsub_topologyManager_psaAdding(void *handle, service_reference_pt reference, void **service);
+celix_status_t pubsub_topologyManager_psaAdded(void *handle, service_reference_pt reference, void *service);
+celix_status_t pubsub_topologyManager_psaModified(void *handle, service_reference_pt reference, void *service);
+celix_status_t pubsub_topologyManager_psaRemoved(void *handle, service_reference_pt reference, void *service);
+
+celix_status_t pubsub_topologyManager_pubsubDiscoveryAdding(void* handle, service_reference_pt reference, void** service);
+celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service_reference_pt reference, void* service);
+celix_status_t pubsub_topologyManager_pubsubDiscoveryModified(void * handle, service_reference_pt reference, void* service);
+celix_status_t pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, service_reference_pt reference, void* service);
+
+celix_status_t pubsub_topologyManager_subscriberAdding(void * handle, service_reference_pt reference, void **service);
+celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_reference_pt reference, void * service);
+celix_status_t pubsub_topologyManager_subscriberModified(void * handle, service_reference_pt reference, void * service);
+celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_reference_pt reference, void * service);
+
+celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_list_pt listeners);
+celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, array_list_pt listeners);
+
+celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_endpoint_pt pubEP);
+celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpoint_pt pubEP);
+
+#endif /* PUBSUB_TOPOLOGY_MANAGER_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pstm_activator.c b/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
new file mode 100644
index 0000000..ae7b4a9
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
@@ -0,0 +1,233 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pstm_activator.c
+ *
+ *  \date       Sep 29, 2011
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "constants.h"
+#include "bundle_activator.h"
+#include "service_tracker.h"
+#include "service_registration.h"
+
+#include "endpoint_listener.h"
+#include "remote_constants.h"
+#include "listener_hook_service.h"
+#include "log_service.h"
+#include "log_helper.h"
+
+
+#include "pubsub_topology_manager.h"
+#include "publisher_endpoint_announce.h"
+
+struct activator {
+	bundle_context_pt context;
+
+	pubsub_topology_manager_pt manager;
+
+	service_tracker_pt pubsubDiscoveryTracker;
+	service_tracker_pt pubsubAdminTracker;
+	service_tracker_pt pubsubSubscribersTracker;
+
+	listener_hook_service_pt hookService;
+	service_registration_pt hook;
+
+	publisher_endpoint_announce_pt publisherEPDiscover;
+	service_registration_pt publisherEPDiscoverService;
+
+	log_helper_pt loghelper;
+};
+
+
+static celix_status_t bundleActivator_createPSDTracker(struct activator *activator, service_tracker_pt *tracker);
+static celix_status_t bundleActivator_createPSATracker(struct activator *activator, service_tracker_pt *tracker);
+static celix_status_t bundleActivator_createPSSubTracker(struct activator *activator, service_tracker_pt *tracker);
+
+
+static celix_status_t bundleActivator_createPSDTracker(struct activator *activator, service_tracker_pt *tracker) {
+	celix_status_t status;
+
+	service_tracker_customizer_pt customizer = NULL;
+
+	status = serviceTrackerCustomizer_create(activator->manager,
+			pubsub_topologyManager_pubsubDiscoveryAdding,
+			pubsub_topologyManager_pubsubDiscoveryAdded,
+			pubsub_topologyManager_pubsubDiscoveryModified,
+			pubsub_topologyManager_pubsubDiscoveryRemoved,
+			&customizer);
+
+	if (status == CELIX_SUCCESS) {
+		status = serviceTracker_create(activator->context, (char *) PUBSUB_DISCOVERY_SERVICE, customizer, tracker);
+	}
+
+	return status;
+}
+
+static celix_status_t bundleActivator_createPSATracker(struct activator *activator, service_tracker_pt *tracker) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	service_tracker_customizer_pt customizer = NULL;
+
+	status = serviceTrackerCustomizer_create(activator->manager,
+			pubsub_topologyManager_psaAdding,
+			pubsub_topologyManager_psaAdded,
+			pubsub_topologyManager_psaModified,
+			pubsub_topologyManager_psaRemoved,
+			&customizer);
+
+	if (status == CELIX_SUCCESS) {
+		status = serviceTracker_create(activator->context, PUBSUB_ADMIN_SERVICE, customizer, tracker);
+	}
+
+	return status;
+}
+
+static celix_status_t bundleActivator_createPSSubTracker(struct activator *activator, service_tracker_pt *tracker) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	service_tracker_customizer_pt customizer = NULL;
+
+	status = serviceTrackerCustomizer_create(activator->manager,
+			pubsub_topologyManager_subscriberAdding,
+			pubsub_topologyManager_subscriberAdded,
+			pubsub_topologyManager_subscriberModified,
+			pubsub_topologyManager_subscriberRemoved,
+			&customizer);
+
+	if (status == CELIX_SUCCESS) {
+		status = serviceTracker_create(activator->context, PUBSUB_SUBSCRIBER_SERVICE_NAME, customizer, tracker);
+	}
+
+	return status;
+}
+
+celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = NULL;
+
+	activator = calloc(1,sizeof(struct activator));
+
+	if (!activator) {
+		return CELIX_ENOMEM;
+	}
+
+	activator->context = context;
+
+	logHelper_create(context, &activator->loghelper);
+	logHelper_start(activator->loghelper);
+
+	status = pubsub_topologyManager_create(context, activator->loghelper, &activator->manager);
+	if (status == CELIX_SUCCESS) {
+		status = bundleActivator_createPSDTracker(activator, &activator->pubsubDiscoveryTracker);
+		if (status == CELIX_SUCCESS) {
+			status = bundleActivator_createPSATracker(activator, &activator->pubsubAdminTracker);
+			if (status == CELIX_SUCCESS) {
+				status = bundleActivator_createPSSubTracker(activator, &activator->pubsubSubscribersTracker);
+				if (status == CELIX_SUCCESS) {
+					*userData = activator;
+				}
+			}
+		}
+	}
+
+	return status;
+}
+
+
+celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = userData;
+
+	publisher_endpoint_announce_pt pubEPDiscover = calloc(1, sizeof(*pubEPDiscover));
+	pubEPDiscover->handle = activator->manager;
+	pubEPDiscover->announcePublisher = pubsub_topologyManager_announcePublisher;
+	pubEPDiscover->removePublisher = pubsub_topologyManager_removePublisher;
+	activator->publisherEPDiscover = pubEPDiscover;
+
+	status += bundleContext_registerService(context, (char *) PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE, pubEPDiscover, NULL, &activator->publisherEPDiscoverService);
+
+
+	listener_hook_service_pt hookService = calloc(1,sizeof(*hookService));
+	hookService->handle = activator->manager;
+	hookService->added = pubsub_topologyManager_publisherTrackerAdded;
+	hookService->removed = pubsub_topologyManager_publisherTrackerRemoved;
+	activator->hookService = hookService;
+
+	status += bundleContext_registerService(context, (char *) OSGI_FRAMEWORK_LISTENER_HOOK_SERVICE_NAME, hookService, NULL, &activator->hook);
+
+	/* NOTE: Enable those line in order to remotely expose the topic_info service
+	properties_pt props = properties_create();
+	properties_set(props, (char *) OSGI_RSA_SERVICE_EXPORTED_INTERFACES, (char *) PUBSUB_TOPIC_INFO_SERVICE);
+	status += bundleContext_registerService(context, (char *) PUBSUB_TOPIC_INFO_SERVICE, activator->topicInfo, props, &activator->topicInfoService);
+	*/
+	status += serviceTracker_open(activator->pubsubAdminTracker);
+
+	status += serviceTracker_open(activator->pubsubDiscoveryTracker);
+
+	status += serviceTracker_open(activator->pubsubSubscribersTracker);
+
+
+	return status;
+}
+
+celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = userData;
+
+	serviceTracker_close(activator->pubsubSubscribersTracker);
+	serviceTracker_close(activator->pubsubDiscoveryTracker);
+	serviceTracker_close(activator->pubsubAdminTracker);
+
+	serviceRegistration_unregister(activator->publisherEPDiscoverService);
+	free(activator->publisherEPDiscover);
+
+	serviceRegistration_unregister(activator->hook);
+	free(activator->hookService);
+
+	return status;
+}
+
+celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	struct activator *activator = userData;
+	if (!activator || !activator->manager) {
+		status = CELIX_BUNDLE_EXCEPTION;
+	} else {
+
+		serviceTracker_destroy(activator->pubsubSubscribersTracker);
+		serviceTracker_destroy(activator->pubsubDiscoveryTracker);
+		serviceTracker_destroy(activator->pubsubAdminTracker);
+
+		logHelper_stop(activator->loghelper);
+		logHelper_destroy(&activator->loghelper);
+
+		status = pubsub_topologyManager_destroy(activator->manager);
+		free(activator);
+	}
+
+	return status;
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c b/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
new file mode 100644
index 0000000..a485f37
--- /dev/null
+++ b/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
@@ -0,0 +1,758 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_topology_manager.c
+ *
+ *  \date       Sep 29, 2011
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdbool.h>
+
+#include "hash_map.h"
+#include "array_list.h"
+#include "bundle_context.h"
+#include "constants.h"
+#include "module.h"
+#include "bundle.h"
+#include "remote_service_admin.h"
+#include "remote_constants.h"
+#include "filter.h"
+#include "listener_hook_service.h"
+#include "utils.h"
+#include "service_reference.h"
+#include "service_registration.h"
+#include "log_service.h"
+#include "log_helper.h"
+
+#include "publisher_endpoint_announce.h"
+#include "pubsub_topology_manager.h"
+#include "pubsub_endpoint.h"
+#include "pubsub_admin.h"
+#include "pubsub_utils.h"
+
+
+celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_pt *manager) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	*manager = calloc(1, sizeof(**manager));
+	if (!*manager) {
+		return CELIX_ENOMEM;
+	}
+
+	(*manager)->context = context;
+
+	celix_thread_mutexattr_t psaAttr;
+	celixThreadMutexAttr_create(&psaAttr);
+	celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE);
+	status = celixThreadMutex_create(&(*manager)->psaListLock, &psaAttr);
+    celixThreadMutexAttr_destroy(&psaAttr);
+
+    status = celixThreadMutex_create(&(*manager)->publicationsLock, NULL);
+	status = celixThreadMutex_create(&(*manager)->subscriptionsLock, NULL);
+	status = celixThreadMutex_create(&(*manager)->discoveryListLock, NULL);
+
+	arrayList_create(&(*manager)->psaList);
+
+	(*manager)->discoveryList = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL);
+	(*manager)->publications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+	(*manager)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+
+	(*manager)->loghelper = logHelper;
+
+	return status;
+}
+
+celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&manager->discoveryListLock);
+	hashMap_destroy(manager->discoveryList, false, false);
+	celixThreadMutex_unlock(&manager->discoveryListLock);
+	celixThreadMutex_destroy(&manager->discoveryListLock);
+
+	celixThreadMutex_lock(&manager->psaListLock);
+	arrayList_destroy(manager->psaList);
+	celixThreadMutex_unlock(&manager->psaListLock);
+	celixThreadMutex_destroy(&manager->psaListLock);
+
+	celixThreadMutex_lock(&manager->publicationsLock);
+	hash_map_iterator_pt pubit = hashMapIterator_create(manager->publications);
+	while(hashMapIterator_hasNext(pubit)){
+		array_list_pt l = (array_list_pt)hashMapIterator_nextValue(pubit);
+		int i;
+		for(i=0;i<arrayList_size(l);i++){
+			pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i));
+		}
+		arrayList_destroy(l);
+	}
+	hashMapIterator_destroy(pubit);
+	hashMap_destroy(manager->publications, true, false);
+	celixThreadMutex_unlock(&manager->publicationsLock);
+	celixThreadMutex_destroy(&manager->publicationsLock);
+
+	celixThreadMutex_lock(&manager->subscriptionsLock);
+	hash_map_iterator_pt subit = hashMapIterator_create(manager->subscriptions);
+	while(hashMapIterator_hasNext(subit)){
+		array_list_pt l = (array_list_pt)hashMapIterator_nextValue(subit);
+		int i;
+		for(i=0;i<arrayList_size(l);i++){
+			pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i));
+		}
+		arrayList_destroy(l);
+	}
+	hashMapIterator_destroy(subit);
+	hashMap_destroy(manager->subscriptions, true, false);
+	celixThreadMutex_unlock(&manager->subscriptionsLock);
+	celixThreadMutex_destroy(&manager->subscriptionsLock);
+
+	free(manager);
+
+	return status;
+}
+
+
+celix_status_t pubsub_topologyManager_psaAdding(void * handle, service_reference_pt reference, void **service) {
+	celix_status_t status = CELIX_SUCCESS;
+	pubsub_topology_manager_pt manager = handle;
+
+	status = bundleContext_getService(manager->context, reference, service);
+
+	return status;
+}
+
+celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_pt reference, void * service) {
+	celix_status_t status = CELIX_SUCCESS;
+	pubsub_topology_manager_pt manager = handle;
+	int i;
+
+	pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service;
+	logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added PSA");
+
+	celixThreadMutex_lock(&manager->psaListLock);
+	arrayList_add(manager->psaList, psa);
+	celixThreadMutex_unlock(&manager->psaListLock);
+
+	// Add already detected subscriptions to new PSA
+	celixThreadMutex_lock(&manager->subscriptionsLock);
+	hash_map_iterator_pt subscriptionsIterator = hashMapIterator_create(manager->subscriptions);
+
+	while (hashMapIterator_hasNext(subscriptionsIterator)) {
+		array_list_pt sub_ep_list = hashMapIterator_nextValue(subscriptionsIterator);
+		for(i=0;i<arrayList_size(sub_ep_list);i++){
+			status += psa->addSubscription(psa->admin, (pubsub_endpoint_pt)arrayList_get(sub_ep_list,i));
+		}
+	}
+
+	hashMapIterator_destroy(subscriptionsIterator);
+
+	celixThreadMutex_unlock(&manager->subscriptionsLock);
+
+	// Add already detected publications to new PSA
+	status = celixThreadMutex_lock(&manager->publicationsLock);
+	hash_map_iterator_pt publicationsIterator = hashMapIterator_create(manager->publications);
+
+	while (hashMapIterator_hasNext(publicationsIterator)) {
+		array_list_pt pub_ep_list = hashMapIterator_nextValue(publicationsIterator);
+		for(i=0;i<arrayList_size(pub_ep_list);i++){
+			status += psa->addPublication(psa->admin, (pubsub_endpoint_pt)arrayList_get(pub_ep_list,i));
+		}
+	}
+
+	hashMapIterator_destroy(publicationsIterator);
+
+	celixThreadMutex_unlock(&manager->publicationsLock);
+
+	return status;
+}
+
+celix_status_t pubsub_topologyManager_psaModified(void * handle, service_reference_pt reference, void * service) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	// Nop...
+
+	return status;
+}
+
+celix_status_t pubsub_topologyManager_psaRemoved(void * handle, service_reference_pt reference, void * service) {
+	celix_status_t status = CELIX_SUCCESS;
+	pubsub_topology_manager_pt manager = handle;
+
+	pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service;
+
+	celixThreadMutex_lock(&manager->psaListLock);
+
+	/* Deactivate all publications */
+	celixThreadMutex_lock(&manager->publicationsLock);
+
+	hash_map_iterator_pt pubit = hashMapIterator_create(manager->publications);
+	while(hashMapIterator_hasNext(pubit)){
+		hash_map_entry_pt pub_entry = hashMapIterator_nextEntry(pubit);
+		char* scope_topic_key = (char*)hashMapEntry_getKey(pub_entry);
+		// Extract scope/topic name from key
+		char scope[MAX_SCOPE_LEN];
+		char topic[MAX_TOPIC_LEN];
+		sscanf(scope_topic_key, "%[^:]:%s", scope, topic );
+		array_list_pt pubEP_list = (array_list_pt)hashMapEntry_getValue(pub_entry);
+
+		status = psa->closeAllPublications(psa->admin,scope,topic);
+
+		if(status==CELIX_SUCCESS){
+			celixThreadMutex_lock(&manager->discoveryListLock);
+			hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
+			while(hashMapIterator_hasNext(iter)){
+				service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
+				publisher_endpoint_announce_pt disc = NULL;
+				bundleContext_getService(manager->context, disc_sr, (void**) &disc);
+				const char* fwUUID = NULL;
+				bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+				int i;
+				for(i=0;i<arrayList_size(pubEP_list);i++){
+					pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
+					if(strcmp(pubEP->frameworkUUID,fwUUID)==0){
+						disc->removePublisher(disc->handle,pubEP);
+					}
+				}
+				bundleContext_ungetService(manager->context, disc_sr, NULL);
+			}
+			hashMapIterator_destroy(iter);
+			celixThreadMutex_unlock(&manager->discoveryListLock);
+		}
+	}
+	hashMapIterator_destroy(pubit);
+
+	celixThreadMutex_unlock(&manager->publicationsLock);
+
+	/* Deactivate all subscriptions */
+	celixThreadMutex_lock(&manager->subscriptionsLock);
+	hash_map_iterator_pt subit = hashMapIterator_create(manager->subscriptions);
+	while(hashMapIterator_hasNext(subit)){
+		// TODO do some error checking
+		char* scope_topic = (char*)hashMapIterator_nextKey(subit);
+		char scope[MAX_TOPIC_LEN];
+		char topic[MAX_TOPIC_LEN];
+		memset(scope, 0 , MAX_TOPIC_LEN*sizeof(char));
+		memset(topic, 0 , MAX_TOPIC_LEN*sizeof(char));
+		sscanf(scope_topic, "%[^:]:%s", scope, topic );
+		status += psa->closeAllSubscriptions(psa->admin,scope, topic);
+	}
+	hashMapIterator_destroy(subit);
+	celixThreadMutex_unlock(&manager->subscriptionsLock);
+
+	arrayList_removeElement(manager->psaList, psa);
+
+	celixThreadMutex_unlock(&manager->psaListLock);
+
+	logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Removed PSA");
+
+	return status;
+}
+
+
+celix_status_t pubsub_topologyManager_subscriberAdding(void * handle, service_reference_pt reference, void **service) {
+	celix_status_t status = CELIX_SUCCESS;
+	pubsub_topology_manager_pt manager = handle;
+
+	status = bundleContext_getService(manager->context, reference, service);
+
+	return status;
+}
+
+celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_reference_pt reference, void * service) {
+	celix_status_t status = CELIX_SUCCESS;
+	pubsub_topology_manager_pt manager = handle;
+	//subscriber_service_pt subscriber = (subscriber_service_pt)service;
+
+	pubsub_endpoint_pt sub = NULL;
+	if(pubsubEndpoint_createFromServiceReference(reference,&sub) == CELIX_SUCCESS){
+		celixThreadMutex_lock(&manager->subscriptionsLock);
+		char *sub_key = createScopeTopicKey(sub->scope, sub->topic);
+
+		array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key);
+		if(sub_list_by_topic==NULL){
+			arrayList_create(&sub_list_by_topic);
+			hashMap_put(manager->subscriptions,strdup(sub_key),sub_list_by_topic);
+		}
+		free(sub_key);
+		arrayList_add(sub_list_by_topic,sub);
+
+		celixThreadMutex_unlock(&manager->subscriptionsLock);
+
+		int j;
+		celixThreadMutex_lock(&manager->psaListLock);
+		for(j=0;j<arrayList_size(manager->psaList);j++){
+
+			pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
+			psa->addSubscription(psa->admin,sub);
+		}
+
+		// Inform discoveries for interest in the topic
+        celixThreadMutex_lock(&manager->discoveryListLock);
+		hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
+        while(hashMapIterator_hasNext(iter)){
+            service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
+            publisher_endpoint_announce_pt disc = NULL;
+            bundleContext_getService(manager->context, disc_sr, (void**) &disc);
+            disc->interestedInTopic(disc->handle, sub->scope, sub->topic);
+            bundleContext_ungetService(manager->context, disc_sr, NULL);
+        }
+        hashMapIterator_destroy(iter);
+        celixThreadMutex_unlock(&manager->discoveryListLock);
+
+		celixThreadMutex_unlock(&manager->psaListLock);
+	}
+	else{
+		status=CELIX_INVALID_BUNDLE_CONTEXT;
+	}
+
+	return status;
+}
+
+celix_status_t pubsub_topologyManager_subscriberModified(void * handle, service_reference_pt reference, void * service) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	// Nop...
+
+	return status;
+}
+
+celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_reference_pt reference, void * service) {
+	celix_status_t status = CELIX_SUCCESS;
+	pubsub_topology_manager_pt manager = handle;
+
+	pubsub_endpoint_pt subcmp = NULL;
+	if(pubsubEndpoint_createFromServiceReference(reference,&subcmp) == CELIX_SUCCESS){
+
+		int j,k;
+		celixThreadMutex_lock(&manager->subscriptionsLock);
+
+		// Inform discoveries that we not interested in the topic any more
+        celixThreadMutex_lock(&manager->discoveryListLock);
+        hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
+        while(hashMapIterator_hasNext(iter)){
+            service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
+            publisher_endpoint_announce_pt disc = NULL;
+            bundleContext_getService(manager->context, disc_sr, (void**) &disc);
+            disc->uninterestedInTopic(disc->handle, subcmp->scope, subcmp->topic);
+            bundleContext_ungetService(manager->context, disc_sr, NULL);
+        }
+        hashMapIterator_destroy(iter);
+        celixThreadMutex_unlock(&manager->discoveryListLock);
+
+		char *sub_key = createScopeTopicKey(subcmp->scope,subcmp->topic);
+		array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key);
+		free(sub_key);
+		if(sub_list_by_topic!=NULL){
+			for(j=0;j<arrayList_size(sub_list_by_topic);j++){
+				pubsub_endpoint_pt sub = arrayList_get(sub_list_by_topic,j);
+				if(pubsubEndpoint_equals(sub,subcmp)){
+					celixThreadMutex_lock(&manager->psaListLock);
+					for(k=0;k<arrayList_size(manager->psaList);k++){
+
+						pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
+						psa->removeSubscription(psa->admin,sub);
+					}
+					celixThreadMutex_unlock(&manager->psaListLock);
+
+				}
+				arrayList_remove(sub_list_by_topic,j);
+
+				/* If it was the last subscriber for this topic, tell PSA to close the ZMQ socket */
+				if(arrayList_size(sub_list_by_topic)==0){
+					celixThreadMutex_lock(&manager->psaListLock);
+					for(k=0;k<arrayList_size(manager->psaList);k++){
+						pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
+						psa->closeAllSubscriptions(psa->admin,sub->scope, sub->topic);
+					}
+					celixThreadMutex_unlock(&manager->psaListLock);
+				}
+
+				pubsubEndpoint_destroy(sub);
+
+			}
+		}
+
+		celixThreadMutex_unlock(&manager->subscriptionsLock);
+
+		pubsubEndpoint_destroy(subcmp);
+
+	}
+	else{
+		status=CELIX_INVALID_BUNDLE_CONTEXT;
+	}
+
+	return status;
+
+}
+
+celix_status_t pubsub_topologyManager_pubsubDiscoveryAdding(void* handle, service_reference_pt reference, void** service) {
+	celix_status_t status = CELIX_SUCCESS;
+	pubsub_topology_manager_pt manager = handle;
+
+	bundleContext_getService(manager->context, reference, service);
+
+	return status;
+}
+
+celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service_reference_pt reference, void* service) {
+	celix_status_t status = CELIX_SUCCESS;
+	pubsub_topology_manager_pt manager = (pubsub_topology_manager_pt)handle;
+	publisher_endpoint_announce_pt disc = (publisher_endpoint_announce_pt)service;
+
+	const char* fwUUID = NULL;
+
+	bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+	if(fwUUID==NULL){
+		printf("PSD: ERRROR: Cannot retrieve fwUUID.\n");
+		return CELIX_INVALID_BUNDLE_CONTEXT;
+	}
+
+	celixThreadMutex_lock(&manager->publicationsLock);
+
+	celixThreadMutex_lock(&manager->discoveryListLock);
+	hashMap_put(manager->discoveryList, reference, NULL);
+	celixThreadMutex_unlock(&manager->discoveryListLock);
+
+	hash_map_iterator_pt iter = hashMapIterator_create(manager->publications);
+	while(hashMapIterator_hasNext(iter)){
+		array_list_pt pubEP_list = (array_list_pt)hashMapIterator_nextValue(iter);
+		for(int i = 0; i < arrayList_size(pubEP_list); i++) {
+			pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
+			if( (strcmp(pubEP->frameworkUUID,fwUUID)==0) && (pubEP->endpoint!=NULL)){
+				status += disc->announcePublisher(disc->handle,pubEP);
+			}
+		}
+	}
+	hashMapIterator_destroy(iter);
+
+	celixThreadMutex_unlock(&manager->publicationsLock);
+
+	celixThreadMutex_lock(&manager->subscriptionsLock);
+	iter = hashMapIterator_create(manager->subscriptions);
+
+	while(hashMapIterator_hasNext(iter)) {
+	    array_list_pt l = (array_list_pt)hashMapIterator_nextValue(iter);
+	    int i;
+	    for(i=0;i<arrayList_size(l);i++){
+	        pubsub_endpoint_pt subEp = (pubsub_endpoint_pt)arrayList_get(l,i);
+
+	        disc->interestedInTopic(disc->handle, subEp->scope, subEp->topic);
+	    }
+	}
+	hashMapIterator_destroy(iter);
+    celixThreadMutex_unlock(&manager->subscriptionsLock);
+
+	return status;
+}
+
+celix_status_t pubsub_topologyManager_pubsubDiscoveryModified(void * handle, service_reference_pt reference, void * service) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	status = pubsub_topologyManager_pubsubDiscoveryRemoved(handle, reference, service);
+	if (status == CELIX_SUCCESS) {
+		status = pubsub_topologyManager_pubsubDiscoveryAdded(handle, reference, service);
+	}
+
+	return status;
+}
+
+celix_status_t pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, service_reference_pt reference, void * service) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	pubsub_topology_manager_pt manager = handle;
+
+	celixThreadMutex_lock(&manager->discoveryListLock);
+
+
+	if (hashMap_remove(manager->discoveryList, reference)) {
+		logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "EndpointListener Removed");
+	}
+
+	celixThreadMutex_unlock(&manager->discoveryListLock);
+
+	return status;
+}
+
+
+celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_list_pt listeners) {
+
+	celix_status_t status = CELIX_SUCCESS;
+	pubsub_topology_manager_pt manager = handle;
+
+	int l_index;
+
+	for (l_index = 0; l_index < arrayList_size(listeners); l_index++) {
+
+		listener_hook_info_pt info = arrayList_get(listeners, l_index);
+
+		const char* fwUUID=NULL;
+		bundleContext_getProperty(info->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+
+		char* scope = pubsub_getScopeFromFilter(info->filter);
+		char* topic = pubsub_getTopicFromFilter(info->filter);
+		if(scope == NULL) {
+			scope = strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT);
+		}
+
+		//TODO: Can we use a better serviceID??
+		bundle_pt bundle = NULL;
+		long bundleId = -1;
+		bundleContext_getBundle(info->context,&bundle);
+		bundle_getBundleId(bundle,&bundleId);
+
+		if(fwUUID !=NULL && topic !=NULL){
+
+			pubsub_endpoint_pt pub = NULL;
+			if(pubsubEndpoint_create(fwUUID, scope, topic,bundleId,NULL,&pub) == CELIX_SUCCESS){
+
+				celixThreadMutex_lock(&manager->publicationsLock);
+				char *pub_key = createScopeTopicKey(scope, topic);
+				array_list_pt pub_list_by_topic = hashMap_get(manager->publications, pub_key);
+				if(pub_list_by_topic==NULL){
+					arrayList_create(&pub_list_by_topic);
+					hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic);
+				}
+				free(pub_key);
+				arrayList_add(pub_list_by_topic,pub);
+
+				celixThreadMutex_unlock(&manager->publicationsLock);
+
+				int j;
+				celixThreadMutex_lock(&manager->psaListLock);
+
+				for(j=0;j<arrayList_size(manager->psaList);j++){
+
+					pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
+					status = psa->addPublication(psa->admin,pub);
+					if(status==CELIX_SUCCESS){
+						celixThreadMutex_lock(&manager->discoveryListLock);
+						hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
+						while(hashMapIterator_hasNext(iter)){
+							service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
+							publisher_endpoint_announce_pt disc = NULL;
+							bundleContext_getService(manager->context, disc_sr, (void**) &disc);
+							disc->announcePublisher(disc->handle,pub);
+							bundleContext_ungetService(manager->context, disc_sr, NULL);
+						}
+						hashMapIterator_destroy(iter);
+						celixThreadMutex_unlock(&manager->discoveryListLock);
+					}
+				}
+
+				celixThreadMutex_unlock(&manager->psaListLock);
+
+			}
+			free(topic);
+
+		}
+		else{
+			status=CELIX_INVALID_BUNDLE_CONTEXT;
+		}
+        free(scope);
+
+	}
+
+	return status;
+
+}
+
+
+celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, array_list_pt listeners) {
+	celix_status_t status = CELIX_SUCCESS;
+	pubsub_topology_manager_pt manager = handle;
+
+	int l_index;
+
+	for (l_index = 0; l_index < arrayList_size(listeners); l_index++) {
+
+		listener_hook_info_pt info = arrayList_get(listeners, l_index);
+
+		char* pub_scope = pubsub_getScopeFromFilter(info->filter);
+		char* pub_topic = pubsub_getTopicFromFilter(info->filter);
+
+		const char* fwUUID=NULL;
+		bundleContext_getProperty(info->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+
+		//TODO: Can we use a better serviceID??
+		bundle_pt bundle = NULL;
+		long bundleId = -1;
+		bundleContext_getBundle(info->context,&bundle);
+		bundle_getBundleId(bundle,&bundleId);
+
+		if(bundle !=NULL && pub_topic !=NULL && bundleId>0){
+
+			pubsub_endpoint_pt pubcmp = NULL;
+			if(pubsubEndpoint_create(fwUUID, pub_scope, pub_topic,bundleId,NULL,&pubcmp) == CELIX_SUCCESS){
+
+				int j,k;
+                celixThreadMutex_lock(&manager->psaListLock);
+                celixThreadMutex_lock(&manager->publicationsLock);
+
+                char *pub_key = createScopeTopicKey(pub_scope, pub_topic);
+				array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
+				if(pub_list_by_topic!=NULL){
+					for(j=0;j<arrayList_size(pub_list_by_topic);j++){
+						pubsub_endpoint_pt pub = arrayList_get(pub_list_by_topic,j);
+						if(pubsubEndpoint_equals(pub,pubcmp)){
+							for(k=0;k<arrayList_size(manager->psaList);k++){
+								pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
+								status = psa->removePublication(psa->admin,pub);
+								if(status==CELIX_SUCCESS){
+									celixThreadMutex_lock(&manager->discoveryListLock);
+									hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
+									while(hashMapIterator_hasNext(iter)){
+										service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
+										publisher_endpoint_announce_pt disc = NULL;
+										bundleContext_getService(manager->context, disc_sr, (void**) &disc);
+										disc->removePublisher(disc->handle,pub);
+										bundleContext_ungetService(manager->context, disc_sr, NULL);
+									}
+									hashMapIterator_destroy(iter);
+									celixThreadMutex_unlock(&manager->discoveryListLock);
+								}
+							}
+						}
+						arrayList_remove(pub_list_by_topic,j);
+
+						/* If it was the last publisher for this topic, tell PSA to close the ZMQ socket and then inform the discovery */
+						if(arrayList_size(pub_list_by_topic)==0){
+							for(k=0;k<arrayList_size(manager->psaList);k++){
+								pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
+								psa->closeAllPublications(psa->admin,pub->scope, pub->topic);
+							}
+						}
+
+						pubsubEndpoint_destroy(pub);
+
+					}
+				}
+
+				celixThreadMutex_unlock(&manager->publicationsLock);
+				celixThreadMutex_unlock(&manager->psaListLock);
+
+				pubsubEndpoint_destroy(pubcmp);
+				free(pub_scope);
+				free(pub_topic);
+				free(pub_key);
+
+			}
+
+		}
+		else{
+			status=CELIX_INVALID_BUNDLE_CONTEXT;
+		}
+	}
+
+	return status;
+}
+
+celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_endpoint_pt pubEP){
+	celix_status_t status = CELIX_SUCCESS;
+	printf("PSTM: New publisher discovered for topic %s [fwUUID=%s, ep=%s]\n",pubEP->topic,pubEP->frameworkUUID,pubEP->endpoint);
+
+	pubsub_topology_manager_pt manager = handle;
+	celixThreadMutex_lock(&manager->publicationsLock);
+	celixThreadMutex_lock(&manager->psaListLock);
+	int i;
+
+	char *pub_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
+
+	array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
+	if(pub_list_by_topic==NULL){
+		arrayList_create(&pub_list_by_topic);
+		hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic);
+	}
+	free(pub_key);
+
+	/* Shouldn't be any other duplicate, since it's filtered out by the discovery */
+	pubsub_endpoint_pt p = NULL;
+	pubsubEndpoint_create(pubEP->frameworkUUID,pubEP->scope,pubEP->topic,pubEP->serviceID,pubEP->endpoint,&p);
+	arrayList_add(pub_list_by_topic,p);
+
+	for(i=0;i<arrayList_size(manager->psaList);i++){
+		pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
+		status += psa->addPublication(psa->admin,p);
+	}
+
+	celixThreadMutex_unlock(&manager->psaListLock);
+	celixThreadMutex_unlock(&manager->publicationsLock);
+
+	return status;
+}
+
+celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpoint_pt pubEP){
+	celix_status_t status = CELIX_SUCCESS;
+	printf("PSTM: Publisher removed for topic %s [fwUUID=%s, ep=%s]\n",pubEP->topic,pubEP->frameworkUUID,pubEP->endpoint);
+
+	pubsub_topology_manager_pt manager = handle;
+	celixThreadMutex_lock(&manager->psaListLock);
+	celixThreadMutex_lock(&manager->publicationsLock);
+	int i;
+
+	char *pub_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
+	array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
+	if(pub_list_by_topic==NULL){
+		printf("PSTM: ERROR: Cannot find topic for known endpoint [%s,%s,%s]. Something is inconsistent.\n",pub_key,pubEP->frameworkUUID,pubEP->endpoint);
+		status = CELIX_ILLEGAL_STATE;
+	}
+	else{
+
+		pubsub_endpoint_pt p = NULL;
+		bool found = false;
+
+		for(i=0;!found && i<arrayList_size(pub_list_by_topic);i++){
+			p = (pubsub_endpoint_pt)arrayList_get(pub_list_by_topic,i);
+			found = pubsubEndpoint_equals(p,pubEP);
+		}
+
+		if(found && p !=NULL){
+
+			for(i=0;i<arrayList_size(manager->psaList);i++){
+				pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
+				status += psa->removePublication(psa->admin,p);
+			}
+
+			arrayList_removeElement(pub_list_by_topic,p);
+
+			/* If it was the last publisher for this topic, tell PSA to close the ZMQ socket */
+			if(arrayList_size(pub_list_by_topic)==0){
+
+				for(i=0;i<arrayList_size(manager->psaList);i++){
+					pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
+					status += psa->closeAllPublications(psa->admin,p->scope, p->topic);
+				}
+			}
+
+			pubsubEndpoint_destroy(p);
+		}
+
+
+	}
+	free(pub_key);
+	celixThreadMutex_unlock(&manager->publicationsLock);
+	celixThreadMutex_unlock(&manager->psaListLock);
+
+
+	return status;
+}
+


Mime
View raw message