celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From griccia...@apache.org
Subject [1/6] celix git commit: Refactored serializers management
Date Wed, 20 Sep 2017 15:09:28 GMT
Repository: celix
Updated Branches:
  refs/heads/pubsub_serializer_refactoring [created] fc720cf2a


http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_serializer_json/private/src/ps_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/private/src/ps_activator.c b/pubsub/pubsub_serializer_json/private/src/ps_activator.c
index e0e23d4..fec5892 100644
--- a/pubsub/pubsub_serializer_json/private/src/ps_activator.c
+++ b/pubsub/pubsub_serializer_json/private/src/ps_activator.c
@@ -68,7 +68,11 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt context)
 		pubsubSerializerSvc->destroySerializerMap = (void*)pubsubSerializer_destroySerializerMap;
 		activator->serializerService = pubsubSerializerSvc;
 
-		status = bundleContext_registerService(context, PUBSUB_SERIALIZER_SERVICE, pubsubSerializerSvc, NULL, &activator->registration);
+		/* Set serializer type */
+		properties_pt props = properties_create();
+		properties_set(props,PUBSUB_SERIALIZER_TYPE_KEY,PUBSUB_SERIALIZER_TYPE);
+
+		status = bundleContext_registerService(context, PUBSUB_SERIALIZER_SERVICE, pubsubSerializerSvc, props, &activator->registration);
 
 	}
 

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c b/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
index fb46f10..cffc816 100644
--- a/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
+++ b/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
@@ -79,206 +79,217 @@ celix_status_t pubsubSerializer_destroy(pubsub_serializer_t* serializer) {
 	return status;
 }
 
-celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* serializer, bundle_pt bundle, pubsub_msg_serializer_map_t** out) {
-    celix_status_t status = CELIX_SUCCESS;
-    pubsub_msg_serializer_map_t* map = calloc(1, sizeof(*map));
-    if (map != NULL) {
-        map->bundle = bundle;
-        map->serializers = hashMap_create(NULL, NULL, NULL, NULL);
-        pubsubSerializer_fillMsgSerializerMap(map->serializers, bundle);
-    } else {
-        logHelper_log(serializer->loghelper, OSGI_LOGSERVICE_ERROR, "Cannot allocate memory for msg map");
-        status = CELIX_ENOMEM;
-    }
-
-    if (status == CELIX_SUCCESS) {
-        *out = map;
-    }
-    return status;
+celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* serializer, bundle_pt bundle, hash_map_pt* serializerMap) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	hash_map_pt map = hashMap_create(NULL, NULL, NULL, NULL);
+
+	if (map != NULL) {
+		pubsubSerializer_fillMsgSerializerMap(map, bundle);
+	} else {
+		logHelper_log(serializer->loghelper, OSGI_LOGSERVICE_ERROR, "Cannot allocate memory for msg map");
+		status = CELIX_ENOMEM;
+	}
+
+	if (status == CELIX_SUCCESS) {
+		*serializerMap = map;
+	}
+	return status;
 }
 
-celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t* serializer, pubsub_msg_serializer_map_t* map) {
-    celix_status_t status = CELIX_SUCCESS;
-    if (map == NULL) {
-        return status;
-    }
-
-    hash_map_iterator_t iter = hashMapIterator_construct(map->serializers);
-    while (hashMapIterator_hasNext(&iter)) {
-        pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter);
-        pubsub_msg_serializer_impl_t* impl = msgSer->handle;
-        dynMessage_destroy(impl->dynMsg); //note msgSer->name and msgSer->version owned by dynType
-        free(impl); //also contains the service struct.
-    }
-    hashMap_destroy(map->serializers, false, false);
-    free(map);
-    return status;
+celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t* serializer, hash_map_pt serializerMap) {
+	celix_status_t status = CELIX_SUCCESS;
+	if (serializerMap == NULL) {
+		return CELIX_ILLEGAL_ARGUMENT;
+	}
+
+	hash_map_iterator_t iter = hashMapIterator_construct(serializerMap);
+	while (hashMapIterator_hasNext(&iter)) {
+		pubsub_msg_serializer_t* msgSerializer = hashMapIterator_nextValue(&iter);
+		dyn_message_type *dynMsg = (dyn_message_type*)msgSerializer->handle;
+		dynMessage_destroy(dynMsg); //note msgSer->name and msgSer->version owned by dynType
+		free(msgSerializer); //also contains the service struct.
+	}
+
+	hashMap_destroy(serializerMap, false, false);
+
+	return status;
 }
 
 
-celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_impl_t* impl, const void* msg, char** out, size_t *outLen) {
-    celix_status_t status = CELIX_SUCCESS;
+celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_t* msgSerializer, const void* msg, void** out, size_t *outLen) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	char *jsonOutput = NULL;
+	dyn_type* dynType = NULL;
+	dyn_message_type *dynMsg = (dyn_message_type*)msgSerializer->handle;
+	dynMessage_getMessageType(dynMsg, &dynType);
 
-    char *jsonOutput = NULL;
-    dyn_type* dynType = NULL;
-    dynMessage_getMessageType(impl->dynMsg, &dynType);
-	int rc = jsonSerializer_serialize(dynType, msg, &jsonOutput);
-	if (rc != 0){
+	if (jsonSerializer_serialize(dynType, msg, &jsonOutput) != 0){
 		status = CELIX_BUNDLE_EXCEPTION;
 	}
-    if (status == CELIX_SUCCESS) {
-        *out = jsonOutput;
-        *outLen = strlen(jsonOutput) + 1;
-    }
+
+	if (status == CELIX_SUCCESS) {
+		*out = jsonOutput;
+		*outLen = strlen(jsonOutput) + 1;
+	}
 
 	return status;
 }
 
-celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_impl_t* impl, const char* input, size_t inputLen, void **out) {
-    celix_status_t status = CELIX_SUCCESS;
-    void *msg = NULL;
-    dyn_type* dynType = NULL;
-    dynMessage_getMessageType(impl->dynMsg, &dynType);
-    int rc = jsonSerializer_deserialize(dynType, input, &msg);
-    if (rc != 0) {
-        status = CELIX_BUNDLE_EXCEPTION;
-    }
-    else{
-        *out = msg;
-    }
+celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_t* msgSerializer, const void* input, size_t inputLen, void **out) {
+
+	celix_status_t status = CELIX_SUCCESS;
+	void *msg = NULL;
+	dyn_type* dynType = NULL;
+	dyn_message_type *dynMsg = (dyn_message_type*)msgSerializer->handle;
+	dynMessage_getMessageType(dynMsg, &dynType);
+
+	if (jsonSerializer_deserialize(dynType, (const char*)input, &msg) != 0) {
+		status = CELIX_BUNDLE_EXCEPTION;
+	}
+	else{
+		*out = msg;
+	}
+
 	return status;
 }
 
-void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_impl_t* impl, void *msg) {
-    dyn_type* dynType = NULL;
-    dynMessage_getMessageType(impl->dynMsg, &dynType);
-    if (dynType != NULL) {
-        dynType_free(dynType, msg);
-    }
+void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_t* msgSerializer, void *msg) {
+	dyn_type* dynType = NULL;
+	dyn_message_type *dynMsg = (dyn_message_type*)msgSerializer->handle;
+	dynMessage_getMessageType(dynMsg, &dynType);
+	if (dynType != NULL) {
+		dynType_free(dynType, msg);
+	}
 }
 
 
 static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgSerializers, bundle_pt bundle) {
-    char* root = NULL;
-    char* metaInfPath = NULL;
+	char* root = NULL;
+	char* metaInfPath = NULL;
 
-    root = pubsubSerializer_getMsgDescriptionDir(bundle);
+	root = pubsubSerializer_getMsgDescriptionDir(bundle);
 
-    if(root != NULL){
-        asprintf(&metaInfPath, "%s/META-INF/descriptors/messages", root);
+	if(root != NULL){
+		asprintf(&metaInfPath, "%s/META-INF/descriptors", root);
 
-        pubsubSerializer_addMsgSerializerFromBundle(root, bundle, msgSerializers);
-        pubsubSerializer_addMsgSerializerFromBundle(metaInfPath, bundle, msgSerializers);
+		pubsubSerializer_addMsgSerializerFromBundle(root, bundle, msgSerializers);
+		pubsubSerializer_addMsgSerializerFromBundle(metaInfPath, bundle, msgSerializers);
 
-        free(metaInfPath);
-        free(root);
-    }
+		free(metaInfPath);
+		free(root);
+	}
 }
 
 static char* pubsubSerializer_getMsgDescriptionDir(bundle_pt bundle)
 {
-    char *root = NULL;
+	char *root = NULL;
 
-    bool isSystemBundle = false;
-    bundle_isSystemBundle(bundle, &isSystemBundle);
+	bool isSystemBundle = false;
+	bundle_isSystemBundle(bundle, &isSystemBundle);
 
-    if(isSystemBundle == true) {
-        bundle_context_pt context;
-        bundle_getContext(bundle, &context);
+	if(isSystemBundle == true) {
+		bundle_context_pt context;
+		bundle_getContext(bundle, &context);
 
-        const char *prop = NULL;
+		const char *prop = NULL;
 
-        bundleContext_getProperty(context, SYSTEM_BUNDLE_ARCHIVE_PATH, &prop);
+		bundleContext_getProperty(context, SYSTEM_BUNDLE_ARCHIVE_PATH, &prop);
 
-        if(prop != NULL) {
-            root = strdup(prop);
-        } else {
-            root = getcwd(NULL, 0);
-        }
-    } else {
-        bundle_getEntry(bundle, ".", &root);
-    }
+		if(prop != NULL) {
+			root = strdup(prop);
+		} else {
+			root = getcwd(NULL, 0);
+		}
+	} else {
+		bundle_getEntry(bundle, ".", &root);
+	}
 
-    return root;
+	return root;
 }
 
 
 static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgSerializers)
 {
-    char path[128];
-    struct dirent *entry = NULL;
-    DIR *dir = opendir(root);
-
-    if(dir) {
-        entry = readdir(dir);
-    }
-
-    while (entry != NULL) {
-
-        if (strstr(entry->d_name, ".descriptor") != NULL) {
-
-            printf("DMU: Parsing entry '%s'\n", entry->d_name);
-
-            memset(path,0,128);
-            snprintf(path, 128, "%s/%s", root, entry->d_name);
-            FILE *stream = fopen(path,"r");
-
-            if (stream != NULL){
-                dyn_message_type* msgType = NULL;
-
-                int rc = dynMessage_parse(stream, &msgType);
-                if (rc == 0 && msgType != NULL) {
-
-                    char* msgName = NULL;
-                    rc += dynMessage_getName(msgType,&msgName);
-
-                    version_pt msgVersion = NULL;
-                    rc += dynMessage_getVersion(msgType, &msgVersion);
-
-                    if(rc == 0 && msgName != NULL && msgVersion != NULL){
-
-                    	unsigned int msgId = utils_stringHash(msgName);
-
-                    	pubsub_msg_serializer_impl_t* impl = calloc(1, sizeof(*impl));
-                    	impl->dynMsg = msgType;
-                    	impl->msgSerializer.handle = impl;
-                    	impl->msgSerializer.msgId = msgId;
-                    	impl->msgSerializer.msgName = msgName;
-                    	impl->msgSerializer.msgVersion = msgVersion;
-                    	impl->msgSerializer.serialize = (void*) pubsubMsgSerializer_serialize;
-                    	impl->msgSerializer.deserialize = (void*) pubsubMsgSerializer_deserialize;
-                    	impl->msgSerializer.freeMsg = (void*) pubsubMsgSerializer_freeMsg;
-
-                    	bool clash = hashMap_containsKey(msgSerializers, (void*)(uintptr_t)msgId);
-                    	if (clash) {
-                    		printf("Cannot add msg %s. clash in msg id %d!!\n", msgName, msgId);
-                    		free(impl);
-                    		dynMessage_destroy(msgType);
-                    	} else if (msgId != 0) {
-                    		hashMap_put(msgSerializers, (void*)(uintptr_t)msgId, &impl->msgSerializer);
-                    	} else {
-                    		printf("Error creating msg serializer\n");
-                    		free(impl);
-                    		dynMessage_destroy(msgType);
-                    	}
-                    }
-                    else{
-                    	printf("Cannot retrieve name and/or version from msg\n");
-                    }
-
-                } else{
-                    printf("DMU: cannot parse message from descriptor %s\n.",path);
-                }
-                fclose(stream);
-            }else{
-                printf("DMU: cannot open descriptor file %s\n.",path);
-            }
-
-        }
-        entry = readdir(dir);
-    }
-
-    if(dir) {
-        closedir(dir);
-    }
+	char path[128];
+	struct dirent *entry = NULL;
+	DIR *dir = opendir(root);
+
+	if(dir) {
+		entry = readdir(dir);
+	}
+
+	while (entry != NULL) {
+
+		if (strstr(entry->d_name, ".descriptor") != NULL) {
+
+			printf("DMU: Parsing entry '%s'\n", entry->d_name);
+
+			memset(path,0,128);
+			snprintf(path, 128, "%s/%s", root, entry->d_name);
+			FILE *stream = fopen(path,"r");
+
+			if (stream != NULL){
+				dyn_message_type* msgType = NULL;
+
+				int rc = dynMessage_parse(stream, &msgType);
+				if (rc == 0 && msgType != NULL) {
+
+					char* msgName = NULL;
+					rc += dynMessage_getName(msgType,&msgName);
+
+					version_pt msgVersion = NULL;
+					rc += dynMessage_getVersion(msgType, &msgVersion);
+
+					if(rc == 0 && msgName != NULL && msgVersion != NULL){
+
+						unsigned int msgId = utils_stringHash(msgName);
+
+						pubsub_msg_serializer_t *msgSerializer = calloc(1,sizeof(pubsub_msg_serializer_t));
+
+						msgSerializer->handle = msgType;
+						msgSerializer->msgId = msgId;
+						msgSerializer->msgName = msgName;
+						msgSerializer->msgVersion = msgVersion;
+						msgSerializer->serialize = (void*) pubsubMsgSerializer_serialize;
+						msgSerializer->deserialize = (void*) pubsubMsgSerializer_deserialize;
+						msgSerializer->freeMsg = (void*) pubsubMsgSerializer_freeMsg;
+
+						bool clash = hashMap_containsKey(msgSerializers, (void*)(uintptr_t)msgId);
+						if (clash){
+							printf("Cannot add msg %s. clash in msg id %d!!\n", msgName, msgId);
+							free(msgSerializer);
+							dynMessage_destroy(msgType);
+						}
+						else if (msgId != 0){
+							printf("Adding %u : %s\n", msgId, msgName);
+							hashMap_put(msgSerializers, (void*)(uintptr_t)msgId, msgSerializer);
+						}
+						else{
+							printf("Error creating msg serializer\n");
+							free(msgSerializer);
+							dynMessage_destroy(msgType);
+						}
+
+					}
+					else{
+						printf("Cannot retrieve name and/or version from msg\n");
+					}
+
+				} else{
+					printf("DMU: cannot parse message from descriptor %s\n.",path);
+				}
+				fclose(stream);
+			}else{
+				printf("DMU: cannot open descriptor file %s\n.",path);
+			}
+
+		}
+		entry = readdir(dir);
+	}
+
+	if(dir) {
+		closedir(dir);
+	}
 }

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h b/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h
index c7cb100..7614e0c 100644
--- a/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h
+++ b/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h
@@ -41,9 +41,6 @@
 struct pubsub_topology_manager {
 	bundle_context_pt context;
 
-	celix_thread_mutex_t serializerListLock;
-	array_list_pt serializerList;
-
 	celix_thread_mutex_t psaListLock;
 	array_list_pt psaList;
 
@@ -65,22 +62,14 @@ celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_help
 celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager);
 celix_status_t pubsub_topologyManager_closeImports(pubsub_topology_manager_pt manager);
 
-celix_status_t pubsub_topologyManager_pubsubSerializerAdding(void *handle, service_reference_pt reference, void **service);
-celix_status_t pubsub_topologyManager_pubsubSerializerAdded(void *handle, service_reference_pt reference, void *service);
-celix_status_t pubsub_topologyManager_pubsubSerializerModified(void *handle, service_reference_pt reference, void *service);
-celix_status_t pubsub_topologyManager_pubsubSerializerRemoved(void *handle, service_reference_pt reference, void *service);
-
-celix_status_t pubsub_topologyManager_psaAdding(void *handle, service_reference_pt reference, void **service);
 celix_status_t pubsub_topologyManager_psaAdded(void *handle, service_reference_pt reference, void *service);
 celix_status_t pubsub_topologyManager_psaModified(void *handle, service_reference_pt reference, void *service);
 celix_status_t pubsub_topologyManager_psaRemoved(void *handle, service_reference_pt reference, void *service);
 
-celix_status_t pubsub_topologyManager_pubsubDiscoveryAdding(void* handle, service_reference_pt reference, void** service);
 celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service_reference_pt reference, void* service);
 celix_status_t pubsub_topologyManager_pubsubDiscoveryModified(void * handle, service_reference_pt reference, void* service);
 celix_status_t pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, service_reference_pt reference, void* service);
 
-celix_status_t pubsub_topologyManager_subscriberAdding(void * handle, service_reference_pt reference, void **service);
 celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_reference_pt reference, void * service);
 celix_status_t pubsub_topologyManager_subscriberModified(void * handle, service_reference_pt reference, void * service);
 celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_reference_pt reference, void * service);

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/private/src/pstm_activator.c b/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
index c202e7a..0ce2571 100644
--- a/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
+++ b/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
@@ -48,7 +48,6 @@ struct activator {
 
 	pubsub_topology_manager_pt manager;
 
-	service_tracker_pt pubsubSerializerTracker;
 	service_tracker_pt pubsubDiscoveryTracker;
 	service_tracker_pt pubsubAdminTracker;
 	service_tracker_pt pubsubSubscribersTracker;
@@ -62,38 +61,19 @@ struct activator {
 	log_helper_pt loghelper;
 };
 
-static celix_status_t bundleActivator_createPSSTracker(struct activator *activator, service_tracker_pt *tracker);
+
 static celix_status_t bundleActivator_createPSDTracker(struct activator *activator, service_tracker_pt *tracker);
 static celix_status_t bundleActivator_createPSATracker(struct activator *activator, service_tracker_pt *tracker);
 static celix_status_t bundleActivator_createPSSubTracker(struct activator *activator, service_tracker_pt *tracker);
 
 
-static celix_status_t bundleActivator_createPSSTracker(struct activator *activator, service_tracker_pt *tracker) {
-	celix_status_t status;
-
-	service_tracker_customizer_pt customizer = NULL;
-
-	status = serviceTrackerCustomizer_create(activator->manager,
-			pubsub_topologyManager_pubsubSerializerAdding,
-			pubsub_topologyManager_pubsubSerializerAdded,
-			pubsub_topologyManager_pubsubSerializerModified,
-			pubsub_topologyManager_pubsubSerializerRemoved,
-			&customizer);
-
-	if (status == CELIX_SUCCESS) {
-		status = serviceTracker_create(activator->context, (char *) PUBSUB_SERIALIZER_SERVICE, customizer, tracker);
-	}
-
-	return status;
-}
-
 static celix_status_t bundleActivator_createPSDTracker(struct activator *activator, service_tracker_pt *tracker) {
 	celix_status_t status;
 
 	service_tracker_customizer_pt customizer = NULL;
 
 	status = serviceTrackerCustomizer_create(activator->manager,
-			pubsub_topologyManager_pubsubDiscoveryAdding,
+			NULL,
 			pubsub_topologyManager_pubsubDiscoveryAdded,
 			pubsub_topologyManager_pubsubDiscoveryModified,
 			pubsub_topologyManager_pubsubDiscoveryRemoved,
@@ -112,7 +92,7 @@ static celix_status_t bundleActivator_createPSATracker(struct activator *activat
 	service_tracker_customizer_pt customizer = NULL;
 
 	status = serviceTrackerCustomizer_create(activator->manager,
-			pubsub_topologyManager_psaAdding,
+			NULL,
 			pubsub_topologyManager_psaAdded,
 			pubsub_topologyManager_psaModified,
 			pubsub_topologyManager_psaRemoved,
@@ -131,7 +111,7 @@ static celix_status_t bundleActivator_createPSSubTracker(struct activator *activ
 	service_tracker_customizer_pt customizer = NULL;
 
 	status = serviceTrackerCustomizer_create(activator->manager,
-			pubsub_topologyManager_subscriberAdding,
+			NULL,
 			pubsub_topologyManager_subscriberAdded,
 			pubsub_topologyManager_subscriberModified,
 			pubsub_topologyManager_subscriberRemoved,
@@ -163,36 +143,15 @@ celix_status_t bundleActivator_create(bundle_context_pt context, void **userData
 	if (status == CELIX_SUCCESS) {
 		status = bundleActivator_createPSDTracker(activator, &activator->pubsubDiscoveryTracker);
 		if (status == CELIX_SUCCESS) {
-			status = bundleActivator_createPSSTracker(activator, &activator->pubsubSerializerTracker);
-			if (status == CELIX_SUCCESS){
-				status = bundleActivator_createPSATracker(activator, &activator->pubsubAdminTracker);
+			status = bundleActivator_createPSATracker(activator, &activator->pubsubAdminTracker);
+			if (status == CELIX_SUCCESS) {
+				status = bundleActivator_createPSSubTracker(activator, &activator->pubsubSubscribersTracker);
 				if (status == CELIX_SUCCESS) {
-					status = bundleActivator_createPSSubTracker(activator, &activator->pubsubSubscribersTracker);
-					if (status == CELIX_SUCCESS) {
-						*userData = activator;
-					}
-					if (status != CELIX_SUCCESS){
-						serviceTracker_destroy(activator->pubsubAdminTracker);
-					}
-				}
-				if (status != CELIX_SUCCESS){
-					serviceTracker_destroy(activator->pubsubSerializerTracker);
+					*userData = activator;
 				}
 			}
-			if (status != CELIX_SUCCESS){
-				serviceTracker_destroy(activator->pubsubDiscoveryTracker);
-			}
-		}
-		if (status != CELIX_SUCCESS){
-			pubsub_topologyManager_destroy(activator->manager);
 		}
 	}
-	if (status != CELIX_SUCCESS){ // an exception occurred so free allocated memory
-		logHelper_stop(activator->loghelper);
-		logHelper_destroy(&activator->loghelper);
-		free(activator);
-
-	}
 
 	return status;
 }
@@ -224,12 +183,13 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt context)
 	properties_set(props, (char *) OSGI_RSA_SERVICE_EXPORTED_INTERFACES, (char *) PUBSUB_TOPIC_INFO_SERVICE);
 	status += bundleContext_registerService(context, (char *) PUBSUB_TOPIC_INFO_SERVICE, activator->topicInfo, props, &activator->topicInfoService);
 	*/
-
-	status += serviceTracker_open(activator->pubsubSerializerTracker);
 	status += serviceTracker_open(activator->pubsubAdminTracker);
+
 	status += serviceTracker_open(activator->pubsubDiscoveryTracker);
+
 	status += serviceTracker_open(activator->pubsubSubscribersTracker);
 
+
 	return status;
 }
 
@@ -239,7 +199,6 @@ celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context)
 
 	serviceTracker_close(activator->pubsubSubscribersTracker);
 	serviceTracker_close(activator->pubsubDiscoveryTracker);
-	serviceTracker_close(activator->pubsubSerializerTracker);
 	serviceTracker_close(activator->pubsubAdminTracker);
 
 	serviceRegistration_unregister(activator->publisherEPDiscoverService);
@@ -261,7 +220,6 @@ celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt contex
 
 		serviceTracker_destroy(activator->pubsubSubscribersTracker);
 		serviceTracker_destroy(activator->pubsubDiscoveryTracker);
-		serviceTracker_destroy(activator->pubsubSerializerTracker);
 		serviceTracker_destroy(activator->pubsubAdminTracker);
 
 		logHelper_stop(activator->loghelper);

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
index 0e7923b..b4e8f46 100644
--- a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
+++ b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
@@ -23,7 +23,6 @@
  *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
  *  \copyright	Apache License, Version 2.0
  */
-
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
@@ -66,15 +65,13 @@ celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_help
 	celixThreadMutexAttr_create(&psaAttr);
 	celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE);
 	status = celixThreadMutex_create(&(*manager)->psaListLock, &psaAttr);
-    celixThreadMutexAttr_destroy(&psaAttr);
+	celixThreadMutexAttr_destroy(&psaAttr);
 
-    status = celixThreadMutex_create(&(*manager)->publicationsLock, NULL);
+	status = celixThreadMutex_create(&(*manager)->publicationsLock, NULL);
 	status = celixThreadMutex_create(&(*manager)->subscriptionsLock, NULL);
 	status = celixThreadMutex_create(&(*manager)->discoveryListLock, NULL);
-	status = celixThreadMutex_create(&(*manager)->serializerListLock, NULL);
 
 	arrayList_create(&(*manager)->psaList);
-	arrayList_create(&(*manager)->serializerList);
 
 	(*manager)->discoveryList = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL);
 	(*manager)->publications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
@@ -98,11 +95,6 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager
 	celixThreadMutex_unlock(&manager->psaListLock);
 	celixThreadMutex_destroy(&manager->psaListLock);
 
-	celixThreadMutex_lock(&manager->serializerListLock);
-	arrayList_destroy(manager->serializerList);
-	celixThreadMutex_unlock(&manager->serializerListLock);
-	celixThreadMutex_destroy(&manager->serializerListLock);
-
 	celixThreadMutex_lock(&manager->publicationsLock);
 	hash_map_iterator_pt pubit = hashMapIterator_create(manager->publications);
 	while(hashMapIterator_hasNext(pubit)){
@@ -138,24 +130,18 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager
 	return status;
 }
 
-
-celix_status_t pubsub_topologyManager_psaAdding(void * handle, service_reference_pt reference, void **service) {
-	celix_status_t status = CELIX_SUCCESS;
-	pubsub_topology_manager_pt manager = handle;
-
-	status = bundleContext_getService(manager->context, reference, service);
-
-	return status;
-}
-
 celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_pt reference, void * service) {
 	celix_status_t status = CELIX_SUCCESS;
 	pubsub_topology_manager_pt manager = handle;
-	int i, j;
+	int i;
 
-	pubsub_admin_service_pt new_psa = (pubsub_admin_service_pt) service;
+	pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service;
 	logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added PSA");
 
+	celixThreadMutex_lock(&manager->psaListLock);
+	arrayList_add(manager->psaList, psa);
+	celixThreadMutex_unlock(&manager->psaListLock);
+
 	// Add already detected subscriptions to new PSA
 	celixThreadMutex_lock(&manager->subscriptionsLock);
 	hash_map_iterator_pt subscriptionsIterator = hashMapIterator_create(manager->subscriptions);
@@ -163,27 +149,7 @@ celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_
 	while (hashMapIterator_hasNext(subscriptionsIterator)) {
 		array_list_pt sub_ep_list = hashMapIterator_nextValue(subscriptionsIterator);
 		for(i=0;i<arrayList_size(sub_ep_list);i++){
-			pubsub_endpoint_pt sub = (pubsub_endpoint_pt)arrayList_get(sub_ep_list,i);
-			double new_psa_score;
-			new_psa->matchSubscriber(new_psa->admin, sub, &new_psa_score);
-			pubsub_admin_service_pt best_psa = NULL;
-			double highest_score = 0;
-
-			for(j=0;j<arrayList_size(manager->psaList);j++){
-				pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
-				double score;
-				psa->matchSubscriber(psa->admin, sub, &score);
-				if (score > highest_score){
-					highest_score = score;
-					best_psa = psa;
-				}
-			}
-			if (best_psa != NULL && (new_psa_score > highest_score)){
-				best_psa->removeSubscription(best_psa->admin, sub);
-			}
-			if (new_psa_score > highest_score){
-				status += new_psa->addSubscription(new_psa->admin, sub);
-			}
+			status += psa->addSubscription(psa->admin, (pubsub_endpoint_pt)arrayList_get(sub_ep_list,i));
 		}
 	}
 
@@ -198,27 +164,7 @@ celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_
 	while (hashMapIterator_hasNext(publicationsIterator)) {
 		array_list_pt pub_ep_list = hashMapIterator_nextValue(publicationsIterator);
 		for(i=0;i<arrayList_size(pub_ep_list);i++){
-			pubsub_endpoint_pt pub = (pubsub_endpoint_pt)arrayList_get(pub_ep_list,i);
-			double new_psa_score;
-			new_psa->matchPublisher(new_psa->admin, pub, &new_psa_score);
-			pubsub_admin_service_pt best_psa = NULL;
-			double highest_score = 0;
-
-			for(j=0;j<arrayList_size(manager->psaList);j++){
-				pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
-				double score;
-				psa->matchPublisher(psa->admin, pub, &score);
-				if (score > highest_score){
-					highest_score = score;
-					best_psa = psa;
-				}
-			}
-			if (best_psa != NULL && (new_psa_score > highest_score)){
-				best_psa->removePublication(best_psa->admin, pub);
-			}
-			if (new_psa_score > highest_score){
-				status += new_psa->addPublication(new_psa->admin, pub);
-			}
+			status += psa->addPublication(psa->admin, (pubsub_endpoint_pt)arrayList_get(pub_ep_list,i));
 		}
 	}
 
@@ -226,20 +172,6 @@ celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_
 
 	celixThreadMutex_unlock(&manager->publicationsLock);
 
-
-	celixThreadMutex_lock(&manager->serializerListLock);
-	unsigned int size = arrayList_size(manager->serializerList);
-	if (size > 0) {
-		pubsub_serializer_service_t* ser = arrayList_get(manager->serializerList, (size-1)); //last, same as result of add/remove serializer
-		new_psa->setSerializer(new_psa->admin, ser);
-	}
-	celixThreadMutex_unlock(&manager->serializerListLock);
-
-	celixThreadMutex_lock(&manager->psaListLock);
-	arrayList_add(manager->psaList, new_psa);
-	celixThreadMutex_unlock(&manager->psaListLock);
-
-
 	return status;
 }
 
@@ -325,97 +257,13 @@ celix_status_t pubsub_topologyManager_psaRemoved(void * handle, service_referenc
 	return status;
 }
 
-celix_status_t pubsub_topologyManager_pubsubSerializerAdding(void* handle, service_reference_pt reference, void** service) {
-	celix_status_t status = CELIX_SUCCESS;
-	pubsub_topology_manager_pt manager = handle;
-
-	bundleContext_getService(manager->context, reference, service);
-
-	return status;
-}
-
-celix_status_t pubsub_topologyManager_pubsubSerializerAdded(void* handle, service_reference_pt reference, void* service) {
-	celix_status_t status = CELIX_SUCCESS;
-
-	pubsub_topology_manager_pt manager = handle;
-	pubsub_serializer_service_t* new_serializer = (pubsub_serializer_service_t*) service;
-
-	celixThreadMutex_lock(&manager->serializerListLock);
-
-	logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added pubsub serializer");
-
-	int i;
-	for(i=0; i<arrayList_size(manager->psaList); i++){
-		pubsub_admin_service_pt psa = (pubsub_admin_service_pt) arrayList_get(manager->psaList,i);
-		psa->setSerializer(psa->admin, new_serializer);
-	}
-
-	arrayList_add(manager->serializerList, new_serializer);
-
-	celixThreadMutex_unlock(&manager->serializerListLock);
-
-	return status;
-}
-
-celix_status_t pubsub_topologyManager_pubsubSerializerModified(void * handle, service_reference_pt reference, void * service) {
-	celix_status_t status = CELIX_SUCCESS;
-
-	// Nop...
-
-	return status;
-}
-
-celix_status_t pubsub_topologyManager_pubsubSerializerRemoved(void * handle, service_reference_pt reference, void * service) {
-	celix_status_t status = CELIX_SUCCESS;
-
-	pubsub_topology_manager_pt manager = handle;
-	pubsub_serializer_service_t* new_serializer = (pubsub_serializer_service_t*) service;
-
-	celixThreadMutex_lock(&manager->serializerListLock);
-
-	logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Removed pubsub serializer");
-
-	int i, j;
-
-	for(i=0; i<arrayList_size(manager->psaList); i++){
-		pubsub_admin_service_pt psa = (pubsub_admin_service_pt) arrayList_get(manager->psaList,i);
-		psa->removeSerializer(psa->admin, new_serializer);
-	}
-
-	arrayList_removeElement(manager->serializerList, new_serializer);
-
-	if (arrayList_size(manager->serializerList) > 0){
-		//there is another serializer available, change the admin so it is using another serializer
-		pubsub_serializer_service_t* replacing_serializer = (pubsub_serializer_service_t*) arrayList_get(manager->serializerList,0);
-
-		for(j=0; j<arrayList_size(manager->psaList); j++){
-			pubsub_admin_service_pt psa = (pubsub_admin_service_pt) arrayList_get(manager->psaList,j);
-			psa->setSerializer(psa->admin, replacing_serializer);
-		}
-	}
-
-	celixThreadMutex_unlock(&manager->serializerListLock);
-
-
-	return status;
-}
-
-celix_status_t pubsub_topologyManager_subscriberAdding(void * handle, service_reference_pt reference, void **service) {
-	celix_status_t status = CELIX_SUCCESS;
-	pubsub_topology_manager_pt manager = handle;
-
-	status = bundleContext_getService(manager->context, reference, service);
-
-	return status;
-}
-
 celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_reference_pt reference, void * service) {
 	celix_status_t status = CELIX_SUCCESS;
 	pubsub_topology_manager_pt manager = handle;
 	//subscriber_service_pt subscriber = (subscriber_service_pt)service;
 
 	pubsub_endpoint_pt sub = NULL;
-	if(pubsubEndpoint_createFromServiceReference(reference,&sub) == CELIX_SUCCESS){
+	if(pubsubEndpoint_createFromServiceReference(reference,&sub,false) == CELIX_SUCCESS){
 		celixThreadMutex_lock(&manager->subscriptionsLock);
 		char *sub_key = createScopeTopicKey(sub->scope, sub->topic);
 
@@ -430,40 +278,40 @@ celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_ref
 		celixThreadMutex_unlock(&manager->subscriptionsLock);
 
 		int j;
-		celixThreadMutex_lock(&manager->psaListLock);
-		double highest_score = -1;
+		double score = 0;
+		double best_score = 0;
 		pubsub_admin_service_pt best_psa = NULL;
-
+		celixThreadMutex_lock(&manager->psaListLock);
 		for(j=0;j<arrayList_size(manager->psaList);j++){
 			pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
-			double score;
-			psa->matchSubscriber(psa->admin, sub, &score);
-			if (score > highest_score){
-				highest_score = score;
+			psa->matchEndpoint(psa->admin,sub,&score);
+			if(score>best_score){ /* We have a new winner! */
+				best_score = score;
 				best_psa = psa;
 			}
 		}
-		if (best_psa != NULL){
+
+		if(best_psa != NULL && best_score>0){
 			best_psa->addSubscription(best_psa->admin,sub);
 		}
 
 		// Inform discoveries for interest in the topic
-        celixThreadMutex_lock(&manager->discoveryListLock);
+		celixThreadMutex_lock(&manager->discoveryListLock);
 		hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
-        while(hashMapIterator_hasNext(iter)){
-            service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
-            publisher_endpoint_announce_pt disc = NULL;
-            bundleContext_getService(manager->context, disc_sr, (void**) &disc);
-            disc->interestedInTopic(disc->handle, sub->scope, sub->topic);
-            bundleContext_ungetService(manager->context, disc_sr, NULL);
-        }
-        hashMapIterator_destroy(iter);
-        celixThreadMutex_unlock(&manager->discoveryListLock);
+		while(hashMapIterator_hasNext(iter)){
+			service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
+			publisher_endpoint_announce_pt disc = NULL;
+			bundleContext_getService(manager->context, disc_sr, (void**) &disc);
+			disc->interestedInTopic(disc->handle, sub->scope, sub->topic);
+			bundleContext_ungetService(manager->context, disc_sr, NULL);
+		}
+		hashMapIterator_destroy(iter);
+		celixThreadMutex_unlock(&manager->discoveryListLock);
 
 		celixThreadMutex_unlock(&manager->psaListLock);
 	}
 	else{
-		status = CELIX_INVALID_BUNDLE_CONTEXT;
+		status=CELIX_INVALID_BUNDLE_CONTEXT;
 	}
 
 	return status;
@@ -482,25 +330,25 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r
 	pubsub_topology_manager_pt manager = handle;
 
 	pubsub_endpoint_pt subcmp = NULL;
-	if(pubsubEndpoint_createFromServiceReference(reference,&subcmp) == CELIX_SUCCESS){
+	if(pubsubEndpoint_createFromServiceReference(reference,&subcmp,false) == CELIX_SUCCESS){
 
 		int j,k;
 
 		// Inform discoveries that we not interested in the topic any more
-        celixThreadMutex_lock(&manager->discoveryListLock);
-        hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
-        while(hashMapIterator_hasNext(iter)){
-            service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
-            publisher_endpoint_announce_pt disc = NULL;
-            bundleContext_getService(manager->context, disc_sr, (void**) &disc);
-            disc->uninterestedInTopic(disc->handle, subcmp->scope, subcmp->topic);
-            bundleContext_ungetService(manager->context, disc_sr, NULL);
-        }
-        hashMapIterator_destroy(iter);
-        celixThreadMutex_unlock(&manager->discoveryListLock);
-
-        celixThreadMutex_lock(&manager->subscriptionsLock);
-        celixThreadMutex_lock(&manager->psaListLock);
+		celixThreadMutex_lock(&manager->discoveryListLock);
+		hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
+		while(hashMapIterator_hasNext(iter)){
+			service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
+			publisher_endpoint_announce_pt disc = NULL;
+			bundleContext_getService(manager->context, disc_sr, (void**) &disc);
+			disc->uninterestedInTopic(disc->handle, subcmp->scope, subcmp->topic);
+			bundleContext_ungetService(manager->context, disc_sr, NULL);
+		}
+		hashMapIterator_destroy(iter);
+		celixThreadMutex_unlock(&manager->discoveryListLock);
+
+		celixThreadMutex_lock(&manager->subscriptionsLock);
+		celixThreadMutex_lock(&manager->psaListLock);
 
 		char *sub_key = createScopeTopicKey(subcmp->scope,subcmp->topic);
 		array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key);
@@ -509,19 +357,10 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r
 			for(j=0;j<arrayList_size(sub_list_by_topic);j++){
 				pubsub_endpoint_pt sub = arrayList_get(sub_list_by_topic,j);
 				if(pubsubEndpoint_equals(sub,subcmp)){
-					double highest_score = -1;
-					pubsub_admin_service_pt best_psa = NULL;
 					for(k=0;k<arrayList_size(manager->psaList);k++){
+						/* No problem with invoking removal on all psa's, only the one that manage this topic will do something */
 						pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-						double score;
-						psa->matchSubscriber(psa->admin, sub, &score);
-						if (score > highest_score){
-							highest_score = score;
-							best_psa = psa;
-						}
-					}
-					if (best_psa != NULL){
-						best_psa->removeSubscription(best_psa->admin,sub);
+						psa->removeSubscription(psa->admin,sub);
 					}
 
 				}
@@ -547,22 +386,13 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r
 
 	}
 	else{
-		status = CELIX_INVALID_BUNDLE_CONTEXT;
+		status=CELIX_INVALID_BUNDLE_CONTEXT;
 	}
 
 	return status;
 
 }
 
-celix_status_t pubsub_topologyManager_pubsubDiscoveryAdding(void* handle, service_reference_pt reference, void** service) {
-	celix_status_t status = CELIX_SUCCESS;
-	pubsub_topology_manager_pt manager = handle;
-
-	bundleContext_getService(manager->context, reference, service);
-
-	return status;
-}
-
 celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service_reference_pt reference, void* service) {
 	celix_status_t status = CELIX_SUCCESS;
 	pubsub_topology_manager_pt manager = (pubsub_topology_manager_pt)handle;
@@ -600,16 +430,16 @@ celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service
 	iter = hashMapIterator_create(manager->subscriptions);
 
 	while(hashMapIterator_hasNext(iter)) {
-	    array_list_pt l = (array_list_pt)hashMapIterator_nextValue(iter);
-	    int i;
-	    for(i=0;i<arrayList_size(l);i++){
-	        pubsub_endpoint_pt subEp = (pubsub_endpoint_pt)arrayList_get(l,i);
+		array_list_pt l = (array_list_pt)hashMapIterator_nextValue(iter);
+		int i;
+		for(i=0;i<arrayList_size(l);i++){
+			pubsub_endpoint_pt subEp = (pubsub_endpoint_pt)arrayList_get(l,i);
 
-	        disc->interestedInTopic(disc->handle, subEp->scope, subEp->topic);
-	    }
+			disc->interestedInTopic(disc->handle, subEp->scope, subEp->topic);
+		}
 	}
 	hashMapIterator_destroy(iter);
-    celixThreadMutex_unlock(&manager->subscriptionsLock);
+	celixThreadMutex_unlock(&manager->subscriptionsLock);
 
 	return status;
 }
@@ -654,81 +484,57 @@ celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_
 
 		listener_hook_info_pt info = arrayList_get(listeners, l_index);
 
-		const char* fwUUID=NULL;
-		bundleContext_getProperty(info->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+		pubsub_endpoint_pt pub = NULL;
+		if(pubsubEndpoint_createFromListenerHookInfo(info, &pub, true) == CELIX_SUCCESS){
 
-		char* scope = pubsub_getScopeFromFilter(info->filter);
-		char* topic = pubsub_getTopicFromFilter(info->filter);
-		if(scope == NULL) {
-			scope = strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT);
-		}
-
-		//TODO: Can we use a better serviceID??
-		bundle_pt bundle = NULL;
-		long bundleId = -1;
-		bundleContext_getBundle(info->context,&bundle);
-		bundle_getBundleId(bundle,&bundleId);
+			celixThreadMutex_lock(&manager->publicationsLock);
+			char *pub_key = createScopeTopicKey(pub->scope, pub->topic);
+			array_list_pt pub_list_by_topic = hashMap_get(manager->publications, pub_key);
+			if(pub_list_by_topic==NULL){
+				arrayList_create(&pub_list_by_topic);
+				hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic);
+			}
+			free(pub_key);
+			arrayList_add(pub_list_by_topic,pub);
 
-		if(fwUUID !=NULL && topic !=NULL){
+			celixThreadMutex_unlock(&manager->publicationsLock);
 
-			pubsub_endpoint_pt pub = NULL;
-			if(pubsubEndpoint_create(fwUUID, scope, topic,bundleId,NULL,&pub) == CELIX_SUCCESS){
+			int j;
+			double score = 0;
+			double best_score = 0;
+			pubsub_admin_service_pt best_psa = NULL;
+			celixThreadMutex_lock(&manager->psaListLock);
 
-				celixThreadMutex_lock(&manager->publicationsLock);
-				char *pub_key = createScopeTopicKey(scope, topic);
-				array_list_pt pub_list_by_topic = hashMap_get(manager->publications, pub_key);
-				if(pub_list_by_topic==NULL){
-					arrayList_create(&pub_list_by_topic);
-					hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic);
+			for(j=0;j<arrayList_size(manager->psaList);j++){
+				pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
+				psa->matchEndpoint(psa->admin,pub,&score);
+				if(score>best_score){ /* We have a new winner! */
+					best_score = score;
+					best_psa = psa;
 				}
-				free(pub_key);
-				arrayList_add(pub_list_by_topic,pub);
-
-				celixThreadMutex_unlock(&manager->publicationsLock);
-
-				int j;
-				celixThreadMutex_lock(&manager->psaListLock);
-
-				double highest_score = -1;
-				pubsub_admin_service_pt best_psa = NULL;
+			}
 
-				for(j=0;j<arrayList_size(manager->psaList);j++){
-					pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
-					double score;
-					psa->matchPublisher(psa->admin, pub, &score);
-					if (score > highest_score){
-						highest_score = score;
-						best_psa = psa;
+			if(best_psa != NULL && best_score>0){
+				status = best_psa->addPublication(best_psa->admin,pub);
+				if(status==CELIX_SUCCESS){
+					celixThreadMutex_lock(&manager->discoveryListLock);
+					hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
+					while(hashMapIterator_hasNext(iter)){
+						service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
+						publisher_endpoint_announce_pt disc = NULL;
+						bundleContext_getService(manager->context, disc_sr, (void**) &disc);
+						disc->announcePublisher(disc->handle,pub);
+						bundleContext_ungetService(manager->context, disc_sr, NULL);
 					}
+					hashMapIterator_destroy(iter);
+					celixThreadMutex_unlock(&manager->discoveryListLock);
 				}
-				if (best_psa != NULL){
-					status = best_psa->addPublication(best_psa->admin,pub);
-					if(status==CELIX_SUCCESS){
-						celixThreadMutex_lock(&manager->discoveryListLock);
-						hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
-						while(hashMapIterator_hasNext(iter)){
-							service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
-							publisher_endpoint_announce_pt disc = NULL;
-							bundleContext_getService(manager->context, disc_sr, (void**) &disc);
-							disc->announcePublisher(disc->handle,pub);
-							bundleContext_ungetService(manager->context, disc_sr, NULL);
-						}
-						hashMapIterator_destroy(iter);
-						celixThreadMutex_unlock(&manager->discoveryListLock);
-					}
-				}
-
-				celixThreadMutex_unlock(&manager->psaListLock);
-
 			}
 
-		}
-		else{
-			status=CELIX_INVALID_BUNDLE_CONTEXT;
+			celixThreadMutex_unlock(&manager->psaListLock);
+
 		}
 
-		free(topic);
-        free(scope);
 	}
 
 	return status;
@@ -746,62 +552,41 @@ celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra
 
 		listener_hook_info_pt info = arrayList_get(listeners, l_index);
 
-		char* pub_scope = pubsub_getScopeFromFilter(info->filter);
-		char* pub_topic = pubsub_getTopicFromFilter(info->filter);
-
-		const char* fwUUID=NULL;
-		bundleContext_getProperty(info->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
-
-		//TODO: Can we use a better serviceID??
-		bundle_pt bundle = NULL;
-		long bundleId = -1;
-		bundleContext_getBundle(info->context,&bundle);
-		bundle_getBundleId(bundle,&bundleId);
-
-		if(bundle !=NULL && pub_topic !=NULL && bundleId>0){
-
-			pubsub_endpoint_pt pubcmp = NULL;
-			if(pubsubEndpoint_create(fwUUID, pub_scope, pub_topic,bundleId,NULL,&pubcmp) == CELIX_SUCCESS){
-
-				int j,k;
-                celixThreadMutex_lock(&manager->psaListLock);
-                celixThreadMutex_lock(&manager->publicationsLock);
-
-                char *pub_key = createScopeTopicKey(pub_scope, pub_topic);
-				array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
-				if(pub_list_by_topic!=NULL){
-					for(j=0;j<arrayList_size(pub_list_by_topic);j++){
-						pubsub_endpoint_pt pub = arrayList_get(pub_list_by_topic,j);
-						if(pubsubEndpoint_equals(pub,pubcmp)){
-							double highest_score = -1;
-							pubsub_admin_service_pt best_psa = NULL;
-
-							for(k=0;k<arrayList_size(manager->psaList);k++){
-								pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-								double score;
-								psa->matchPublisher(psa->admin, pub, &score);
-								if (score > highest_score){
-									highest_score = score;
-									best_psa = psa;
+		pubsub_endpoint_pt pubcmp = NULL;
+		if(pubsubEndpoint_createFromListenerHookInfo(info,&pubcmp,true) == CELIX_SUCCESS){
+
+
+			int j,k;
+			celixThreadMutex_lock(&manager->psaListLock);
+			celixThreadMutex_lock(&manager->publicationsLock);
+
+			char *pub_key = createScopeTopicKey(pubcmp->scope, pubcmp->topic);
+			array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
+			if(pub_list_by_topic!=NULL){
+				for(j=0;j<arrayList_size(pub_list_by_topic);j++){
+					pubsub_endpoint_pt pub = arrayList_get(pub_list_by_topic,j);
+					if(pubsubEndpoint_equals(pub,pubcmp)){
+						for(k=0;k<arrayList_size(manager->psaList);k++){
+							pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
+							status = psa->removePublication(psa->admin,pub);
+							if(status==CELIX_SUCCESS){ /* We found the one that manages this endpoint */
+								celixThreadMutex_lock(&manager->discoveryListLock);
+								hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
+								while(hashMapIterator_hasNext(iter)){
+									service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
+									publisher_endpoint_announce_pt disc = NULL;
+									bundleContext_getService(manager->context, disc_sr, (void**) &disc);
+									disc->removePublisher(disc->handle,pub);
+									bundleContext_ungetService(manager->context, disc_sr, NULL);
 								}
+								hashMapIterator_destroy(iter);
+								celixThreadMutex_unlock(&manager->discoveryListLock);
 							}
-							if (best_psa != NULL){
-								status = best_psa->removePublication(best_psa->admin,pub);
-								if(status==CELIX_SUCCESS){
-									celixThreadMutex_lock(&manager->discoveryListLock);
-									hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
-									while(hashMapIterator_hasNext(iter)){
-										service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
-										publisher_endpoint_announce_pt disc = NULL;
-										bundleContext_getService(manager->context, disc_sr, (void**) &disc);
-										disc->removePublisher(disc->handle,pub);
-										bundleContext_ungetService(manager->context, disc_sr, NULL);
-									}
-									hashMapIterator_destroy(iter);
-									celixThreadMutex_unlock(&manager->discoveryListLock);
-								}
+							else if(status ==  CELIX_ILLEGAL_ARGUMENT){ /* Not a real error, just saying this psa does not handle this endpoint */
+								status = CELIX_SUCCESS;
 							}
 						}
+						//}
 						arrayList_remove(pub_list_by_topic,j);
 
 						/* If it was the last publisher for this topic, tell PSA to close the ZMQ socket and then inform the discovery */
@@ -813,26 +598,20 @@ celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra
 						}
 
 						pubsubEndpoint_destroy(pub);
-
 					}
-				}
 
-				celixThreadMutex_unlock(&manager->publicationsLock);
-				celixThreadMutex_unlock(&manager->psaListLock);
+				}
+			}
 
-				pubsubEndpoint_destroy(pubcmp);
+			celixThreadMutex_unlock(&manager->publicationsLock);
+			celixThreadMutex_unlock(&manager->psaListLock);
 
-				free(pub_key);
+			free(pub_key);
 
-			}
+			pubsubEndpoint_destroy(pubcmp);
 
 		}
-		else{
-			status=CELIX_INVALID_BUNDLE_CONTEXT;
-		}
 
-		free(pub_scope);
-		free(pub_topic);
 	}
 
 	return status;
@@ -846,8 +625,6 @@ celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_end
 	celixThreadMutex_lock(&manager->psaListLock);
 	celixThreadMutex_lock(&manager->publicationsLock);
 
-	int i;
-
 	char *pub_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
 
 	array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
@@ -859,23 +636,28 @@ celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_end
 
 	/* Shouldn't be any other duplicate, since it's filtered out by the discovery */
 	pubsub_endpoint_pt p = NULL;
-	pubsubEndpoint_create(pubEP->frameworkUUID,pubEP->scope,pubEP->topic,pubEP->serviceID,pubEP->endpoint,&p);
+	pubsubEndpoint_clone(pubEP, &p);
 	arrayList_add(pub_list_by_topic,p);
 
-	double highest_score = -1;
+	int j;
+	double score = 0;
+	double best_score = 0;
 	pubsub_admin_service_pt best_psa = NULL;
 
-	for(i=0;i<arrayList_size(manager->psaList);i++){
-		pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
-		double score;
-		psa->matchPublisher(psa->admin, p, &score);
-		if (score > highest_score){
-			highest_score = score;
+	for(j=0;j<arrayList_size(manager->psaList);j++){
+		pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
+		psa->matchEndpoint(psa->admin,p,&score);
+		if(score>best_score){ /* We have a new winner! */
+			best_score = score;
 			best_psa = psa;
 		}
 	}
-	if (best_psa != NULL){
-		status += best_psa->addPublication(best_psa->admin,p);
+
+	if(best_psa != NULL && best_score>0){
+		best_psa->addPublication(best_psa->admin,p);
+	}
+	else{
+		status = CELIX_ILLEGAL_STATE;
 	}
 
 	celixThreadMutex_unlock(&manager->publicationsLock);
@@ -911,20 +693,10 @@ celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpo
 
 		if(found && p !=NULL){
 
-			double highest_score = -1;
-			pubsub_admin_service_pt best_psa = NULL;
-
 			for(i=0;i<arrayList_size(manager->psaList);i++){
 				pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
-				double score;
-				psa->matchPublisher(psa->admin, p, &score);
-				if (score > highest_score){
-					highest_score = score;
-					best_psa = psa;
-				}
-			}
-			if (best_psa != NULL){
-				status += best_psa->removePublication(best_psa->admin,p);
+				/* No problem with invoking removal on all psa's, only the one that manage this topic will do something */
+				psa->removePublication(psa->admin,p);
 			}
 
 			arrayList_removeElement(pub_list_by_topic,p);
@@ -934,7 +706,7 @@ celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpo
 
 				for(i=0;i<arrayList_size(manager->psaList);i++){
 					pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
-					status += psa->closeAllPublications(psa->admin,p->scope, p->topic);
+					psa->closeAllPublications(psa->admin,p->scope, p->topic);
 				}
 			}
 


Mime
View raw message