celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnol...@apache.org
Subject [02/19] celix git commit: CELIX-397: Adds match pub/sub implementation
Date Mon, 06 Feb 2017 18:34:14 GMT
CELIX-397: Adds match pub/sub implementation


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/091a1126
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/091a1126
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/091a1126

Branch: refs/heads/develop
Commit: 091a1126d71b919b3235b5456667108f4e2ff42b
Parents: f32d071
Author: Roy Lenferink <lenferinkroy@gmail.com>
Authored: Mon Feb 6 17:08:24 2017 +0100
Committer: Roy Lenferink <lenferinkroy@gmail.com>
Committed: Mon Feb 6 17:08:24 2017 +0100

----------------------------------------------------------------------
 celix-pubsub/pubsub/deploy/CMakeLists.txt       |  34 ++++-
 .../private/include/pubsub_admin_impl.h         |   4 +-
 .../private/src/psa_activator.c                 |   3 +
 .../private/src/pubsub_admin_impl.c             |  32 ++++-
 .../private/src/topic_subscription.c            |   9 +-
 .../private/include/pubsub_admin_impl.h         |   3 +
 .../private/src/psa_activator.c                 |   3 +
 .../private/src/pubsub_admin_impl.c             |  37 ++++-
 .../pubsub_common/public/include/pubsub_admin.h |   5 +
 .../private/src/pubsub_topology_manager.c       | 136 ++++++++++++++++---
 10 files changed, 241 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/091a1126/celix-pubsub/pubsub/deploy/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/deploy/CMakeLists.txt b/celix-pubsub/pubsub/deploy/CMakeLists.txt
index 5a3838b..c1d0d44 100644
--- a/celix-pubsub/pubsub/deploy/CMakeLists.txt
+++ b/celix-pubsub/pubsub/deploy/CMakeLists.txt
@@ -15,6 +15,38 @@
 # specific language governing permissions and limitations
 # under the License.
 
+# Dynamic ZMQ / UDP admin
+add_deploy("pubsub_publisher" 
+    GROUP "pubsub"
+    BUNDLES
+       shell
+       shell_tui
+       org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+       org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
+       org.apache.celix.pubsub_admin.PubSubAdmin
+       org.apache.celix.pubsub_admin.PubSubAdminUdpMc
+       org.apache.celix.pubsub_publisher.PoiPublisher
+       org.apache.celix.pubsub_publisher.PoiPublisher2
+    PROPERTIES
+       poi1.psa=zmq
+       poi2.psa=udp
+)
+
+add_deploy("pubsub_subscriber"
+    GROUP "pubsub"
+    BUNDLES
+       shell
+       shell_tui
+       org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+       org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
+       org.apache.celix.pubsub_admin.PubSubAdmin
+       org.apache.celix.pubsub_admin.PubSubAdminUdpMc
+       org.apache.celix.pubsub_subscriber.PoiSubscriber
+    PROPERTIES
+       poi1.psa=zmq
+       poi2.psa=udp
+)
+
 # ZMQ
 add_deploy("pubsub_zmq"
     GROUP "pubsub"
@@ -40,7 +72,6 @@ add_deploy("pubsub_publisher_zmq"
        org.apache.celix.pubsub_publisher.PoiPublisher2
    	PROPERTIES
 	    pubsub.scope=my_small_scope
-   
 )
 
 add_deploy("pubsub_subscriber_zmq"
@@ -75,6 +106,7 @@ add_deploy("pubsub_publisher_udp_mc"
        org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
        org.apache.celix.pubsub_admin.PubSubAdminUdpMc
        org.apache.celix.pubsub_publisher.PoiPublisher
+       org.apache.celix.pubsub_publisher.PoiPublisher2
 )
 
 add_deploy("pubsub_subscriber_udp_mc" 

http://git-wip-us.apache.org/repos/asf/celix/blob/091a1126/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
index 19a56a8..35fc164 100644
--- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
+++ b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
@@ -51,7 +51,6 @@ struct pubsub_admin {
     char* mcIpAddress; // The multicast IP address
 
 	int sendSocket;
-    void* zmq_context; // to be removed
 
 };
 
@@ -68,4 +67,7 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
 celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char* scope, char*
topic);
 celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope, char*
topic);
 
+celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP,
double* score);
+celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP,
double* score);
+
 #endif /* PUBSUB_ADMIN_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/091a1126/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c
index 053c757..24202dd 100644
--- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c
+++ b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c
@@ -73,6 +73,9 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt
context)
 		pubsubAdminSvc->closeAllPublications = pubsubAdmin_closeAllPublications;
 		pubsubAdminSvc->closeAllSubscriptions = pubsubAdmin_closeAllSubscriptions;
 
+		pubsubAdminSvc->matchPublisher = pubsubAdmin_matchPublisher;
+		pubsubAdminSvc->matchSubscriber = pubsubAdmin_matchSubscriber;
+
 		activator->adminService = pubsubAdminSvc;
 
 		status = bundleContext_registerService(context, PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL,
&activator->registration);

http://git-wip-us.apache.org/repos/asf/celix/blob/091a1126/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
index 722a7a4..9182e1b 100644
--- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
+++ b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
@@ -67,7 +67,7 @@ static char *DEFAULT_MC_PREFIX = "224.100";
 static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip);
 static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt
subEP);
 static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt
subEP);
-
+static celix_status_t pubsubAdmin_match(pubsub_admin_pt admin, pubsub_endpoint_pt psEP, double*
score);
 
 celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin) {
 	celix_status_t status = CELIX_SUCCESS;
@@ -617,6 +617,36 @@ celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char
*sco
 
 }
 
+celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP,
double* score){
+	celix_status_t status = CELIX_SUCCESS;
+	status = pubsubAdmin_match(admin, pubEP, score);
+	return status;
+}
+
+celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP,
double* score){
+	celix_status_t status = CELIX_SUCCESS;
+	status = pubsubAdmin_match(admin, subEP, score);
+	return status;
+}
+
+static celix_status_t pubsubAdmin_match(pubsub_admin_pt admin, pubsub_endpoint_pt psEP, double*
score){
+	celix_status_t status = CELIX_SUCCESS;
+
+	char topic_psa_prop[1024];
+	snprintf(topic_psa_prop, 1024, "%s.psa", psEP->topic);
+
+	const char* psa_to_use = NULL;
+	bundleContext_getPropertyWithDefault(admin->bundle_context, topic_psa_prop, PSA_DEFAULT,
&psa_to_use);
+
+	*score = 0;
+	if (strcmp(psa_to_use, "udp") == 0){
+		*score += 100;
+	}else{
+		*score += 1;
+	}
+
+	return status;
+}
 
 #ifndef ANDROID
 static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip) {

http://git-wip-us.apache.org/repos/asf/celix/blob/091a1126/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
index 12ec56d..e1cccca 100644
--- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
+++ b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
@@ -398,8 +398,9 @@ static void process_msg(topic_subscription_pt sub,pubsub_udp_msg_pt msg){
 		hash_map_pt msgTypes = hashMapEntry_getValue(entry);
 
 		pubsub_message_type *msgType = hashMap_get(msgTypes,&(msg->header.type));
+
 		if (msgType == NULL) {
-			printf("TS: Primary message %d not supported. NOT sending any part of the whole message.\n",msg->header.type);
+			printf("TS: Primary message %d not supported. NOT receiving any part of the whole message.\n",msg->header.type);
 		}
 		else{
 			void *msgInst = NULL;
@@ -459,7 +460,11 @@ static void* udp_recv_thread_func(void * arg) {
                 	continue;
                 }
 
-                process_msg(sub, udpMsg);
+                if (udpMsg->header.type == 0){
+                	//Raw msg, since raw messages are not supported, don't do anything.
+                }else{
+                	process_msg(sub, udpMsg);
+                }
 
                 free(udpMsg);
             }

http://git-wip-us.apache.org/repos/asf/celix/blob/091a1126/celix-pubsub/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h b/celix-pubsub/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
index cc28ccc..efe76c3 100644
--- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
+++ b/celix-pubsub/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
@@ -83,4 +83,7 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
 celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char* scope, char*
topic);
 celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope,char*
topic);
 
+celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP,
double* score);
+celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP,
double* score);
+
 #endif /* PUBSUB_ADMIN_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/091a1126/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/psa_activator.c b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/psa_activator.c
index 50b28ec..20f6070 100644
--- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/psa_activator.c
+++ b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/psa_activator.c
@@ -74,6 +74,9 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt
context)
 		pubsubAdminSvc->closeAllPublications = pubsubAdmin_closeAllPublications;
 		pubsubAdminSvc->closeAllSubscriptions = pubsubAdmin_closeAllSubscriptions;
 
+		pubsubAdminSvc->matchPublisher = pubsubAdmin_matchPublisher;
+		pubsubAdminSvc->matchSubscriber = pubsubAdmin_matchSubscriber;
+
 		activator->adminService = pubsubAdminSvc;
 
 		status = bundleContext_registerService(context, PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL,
&activator->registration);

http://git-wip-us.apache.org/repos/asf/celix/blob/091a1126/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
index d83abf0..8e14800 100644
--- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
+++ b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
@@ -66,9 +66,10 @@
 
 static const char *DEFAULT_IP = "127.0.0.1";
 
-static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip);
+static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip);
 static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt
subEP);
 static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt
subEP);
+static celix_status_t pubsubAdmin_match(pubsub_admin_pt admin, pubsub_endpoint_pt psEP, double*
score);
 
 celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin) {
 	celix_status_t status = CELIX_SUCCESS;
@@ -111,7 +112,7 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt
*ad
 			const char *interface = NULL;
 
 			bundleContext_getProperty(context, PSA_ITF, &interface);
-			if (pubsubAdmin_getIpAdress(interface, &detectedIp) != CELIX_SUCCESS) {
+			if (pubsubAdmin_getIpAddress(interface, &detectedIp) != CELIX_SUCCESS) {
 				logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA: Could not retrieve
IP adress for interface %s", interface);
 			}
 
@@ -651,9 +652,39 @@ celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char*
sco
 
 }
 
+celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP,
double* score){
+	celix_status_t status = CELIX_SUCCESS;
+	status = pubsubAdmin_match(admin, pubEP, score);
+	return status;
+}
+
+celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP,
double* score){
+	celix_status_t status = CELIX_SUCCESS;
+	status = pubsubAdmin_match(admin, subEP, score);
+	return status;
+}
+
+static celix_status_t pubsubAdmin_match(pubsub_admin_pt admin, pubsub_endpoint_pt psEP, double*
score){
+	celix_status_t status = CELIX_SUCCESS;
+
+	char topic_psa_prop[1024];
+	snprintf(topic_psa_prop, 1024, "%s.psa", psEP->topic);
+
+	const char* psa_to_use = NULL;
+	bundleContext_getPropertyWithDefault(admin->bundle_context, topic_psa_prop, PSA_DEFAULT,
&psa_to_use);
+
+	*score = 0;
+	if (strcmp(psa_to_use, "zmq") == 0){
+		*score += 100;
+	}else{
+		*score += 1;
+	}
+
+	return status;
+}
 
 #ifndef ANDROID
-static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip) {
+static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip) {
 	celix_status_t status = CELIX_BUNDLE_EXCEPTION;
 
 	struct ifaddrs *ifaddr, *ifa;

http://git-wip-us.apache.org/repos/asf/celix/blob/091a1126/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_admin.h
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_admin.h b/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_admin.h
index 1670942..fc1cfbb 100644
--- a/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_admin.h
+++ b/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_admin.h
@@ -36,6 +36,8 @@
 #define PSA_ITF	"PSA_INTERFACE"
 #define PSA_MULTICAST_IP_PREFIX "PSA_MC_PREFIX"
 
+#define PSA_DEFAULT "zmq"
+
 typedef struct pubsub_admin *pubsub_admin_pt;
 
 struct pubsub_admin_service {
@@ -49,6 +51,9 @@ struct pubsub_admin_service {
 
 	celix_status_t (*closeAllPublications)(pubsub_admin_pt admin,char* scope, char* topic);
 	celix_status_t (*closeAllSubscriptions)(pubsub_admin_pt admin,char* scope, char* topic);
+
+	celix_status_t (*matchPublisher)(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double*
score);
+	celix_status_t (*matchSubscriber)(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double*
score);
 };
 
 typedef struct pubsub_admin_service *pubsub_admin_service_pt;

http://git-wip-us.apache.org/repos/asf/celix/blob/091a1126/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
b/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
index a485f37..5ba1315 100644
--- a/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
+++ b/celix-pubsub/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
@@ -144,15 +144,11 @@ celix_status_t pubsub_topologyManager_psaAdding(void * handle, service_reference
 celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_pt reference,
void * service) {
 	celix_status_t status = CELIX_SUCCESS;
 	pubsub_topology_manager_pt manager = handle;
-	int i;
+	int i, j;
 
-	pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service;
+	pubsub_admin_service_pt new_psa = (pubsub_admin_service_pt) service;
 	logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added PSA");
 
-	celixThreadMutex_lock(&manager->psaListLock);
-	arrayList_add(manager->psaList, psa);
-	celixThreadMutex_unlock(&manager->psaListLock);
-
 	// Add already detected subscriptions to new PSA
 	celixThreadMutex_lock(&manager->subscriptionsLock);
 	hash_map_iterator_pt subscriptionsIterator = hashMapIterator_create(manager->subscriptions);
@@ -160,7 +156,27 @@ celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_
 	while (hashMapIterator_hasNext(subscriptionsIterator)) {
 		array_list_pt sub_ep_list = hashMapIterator_nextValue(subscriptionsIterator);
 		for(i=0;i<arrayList_size(sub_ep_list);i++){
-			status += psa->addSubscription(psa->admin, (pubsub_endpoint_pt)arrayList_get(sub_ep_list,i));
+			pubsub_endpoint_pt sub = (pubsub_endpoint_pt)arrayList_get(sub_ep_list,i);
+			double new_psa_score;
+			new_psa->matchSubscriber(new_psa->admin, sub, &new_psa_score);
+			pubsub_admin_service_pt best_psa = NULL;
+			double highest_score = 0;
+
+			for(j=0;j<arrayList_size(manager->psaList);j++){
+				pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
+				double score;
+				psa->matchSubscriber(psa->admin, sub, &score);
+				if (score > highest_score){
+					highest_score = score;
+					best_psa = psa;
+				}
+			}
+			if (best_psa != NULL && (new_psa_score > highest_score)){
+				best_psa->removeSubscription(best_psa->admin, sub);
+			}
+			if (new_psa_score > highest_score){
+				status += new_psa->addSubscription(new_psa->admin, sub);
+			}
 		}
 	}
 
@@ -175,7 +191,27 @@ celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_
 	while (hashMapIterator_hasNext(publicationsIterator)) {
 		array_list_pt pub_ep_list = hashMapIterator_nextValue(publicationsIterator);
 		for(i=0;i<arrayList_size(pub_ep_list);i++){
-			status += psa->addPublication(psa->admin, (pubsub_endpoint_pt)arrayList_get(pub_ep_list,i));
+			pubsub_endpoint_pt pub = (pubsub_endpoint_pt)arrayList_get(pub_ep_list,i);
+			double new_psa_score;
+			new_psa->matchPublisher(new_psa->admin, pub, &new_psa_score);
+			pubsub_admin_service_pt best_psa = NULL;
+			double highest_score = 0;
+
+			for(j=0;j<arrayList_size(manager->psaList);j++){
+				pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
+				double score;
+				psa->matchPublisher(psa->admin, pub, &score);
+				if (score > highest_score){
+					highest_score = score;
+					best_psa = psa;
+				}
+			}
+			if (best_psa != NULL && (new_psa_score > highest_score)){
+				best_psa->removePublication(best_psa->admin, pub);
+			}
+			if (new_psa_score > highest_score){
+				status += new_psa->addPublication(new_psa->admin, pub);
+			}
 		}
 	}
 
@@ -183,6 +219,10 @@ celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_
 
 	celixThreadMutex_unlock(&manager->publicationsLock);
 
+	celixThreadMutex_lock(&manager->psaListLock);
+	arrayList_add(manager->psaList, new_psa);
+	celixThreadMutex_unlock(&manager->psaListLock);
+
 	return status;
 }
 
@@ -300,10 +340,20 @@ celix_status_t pubsub_topologyManager_subscriberAdded(void * handle,
service_ref
 
 		int j;
 		celixThreadMutex_lock(&manager->psaListLock);
-		for(j=0;j<arrayList_size(manager->psaList);j++){
+		double highest_score = -1;
+		pubsub_admin_service_pt best_psa = NULL;
 
+		for(j=0;j<arrayList_size(manager->psaList);j++){
 			pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
-			psa->addSubscription(psa->admin,sub);
+			double score;
+			psa->matchSubscriber(psa->admin, sub, &score);
+			if (score > highest_score){
+				highest_score = score;
+				best_psa = psa;
+			}
+		}
+		if (best_psa != NULL){
+			best_psa->addSubscription(best_psa->admin,sub);
 		}
 
 		// Inform discoveries for interest in the topic
@@ -367,10 +417,19 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle,
service_r
 				pubsub_endpoint_pt sub = arrayList_get(sub_list_by_topic,j);
 				if(pubsubEndpoint_equals(sub,subcmp)){
 					celixThreadMutex_lock(&manager->psaListLock);
+					double highest_score = -1;
+					pubsub_admin_service_pt best_psa = NULL;
 					for(k=0;k<arrayList_size(manager->psaList);k++){
-
 						pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-						psa->removeSubscription(psa->admin,sub);
+						double score;
+						psa->matchSubscriber(psa->admin, sub, &score);
+						if (score > highest_score){
+							highest_score = score;
+							best_psa = psa;
+						}
+					}
+					if (best_psa != NULL){
+						best_psa->removeSubscription(best_psa->admin,sub);
 					}
 					celixThreadMutex_unlock(&manager->psaListLock);
 
@@ -540,10 +599,20 @@ celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle,
array_
 				int j;
 				celixThreadMutex_lock(&manager->psaListLock);
 
-				for(j=0;j<arrayList_size(manager->psaList);j++){
+				double highest_score = -1;
+				pubsub_admin_service_pt best_psa = NULL;
 
+				for(j=0;j<arrayList_size(manager->psaList);j++){
 					pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
-					status = psa->addPublication(psa->admin,pub);
+					double score;
+					psa->matchPublisher(psa->admin, pub, &score);
+					if (score > highest_score){
+						highest_score = score;
+						best_psa = psa;
+					}
+				}
+				if (best_psa != NULL){
+					status = best_psa->addPublication(best_psa->admin,pub);
 					if(status==CELIX_SUCCESS){
 						celixThreadMutex_lock(&manager->discoveryListLock);
 						hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
@@ -614,9 +683,20 @@ celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle,
arra
 					for(j=0;j<arrayList_size(pub_list_by_topic);j++){
 						pubsub_endpoint_pt pub = arrayList_get(pub_list_by_topic,j);
 						if(pubsubEndpoint_equals(pub,pubcmp)){
+							double highest_score = -1;
+							pubsub_admin_service_pt best_psa = NULL;
+
 							for(k=0;k<arrayList_size(manager->psaList);k++){
 								pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-								status = psa->removePublication(psa->admin,pub);
+								double score;
+								psa->matchPublisher(psa->admin, pub, &score);
+								if (score > highest_score){
+									highest_score = score;
+									best_psa = psa;
+								}
+							}
+							if (best_psa != NULL){
+								status = best_psa->removePublication(best_psa->admin,pub);
 								if(status==CELIX_SUCCESS){
 									celixThreadMutex_lock(&manager->discoveryListLock);
 									hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
@@ -689,9 +769,20 @@ celix_status_t pubsub_topologyManager_announcePublisher(void *handle,
pubsub_end
 	pubsubEndpoint_create(pubEP->frameworkUUID,pubEP->scope,pubEP->topic,pubEP->serviceID,pubEP->endpoint,&p);
 	arrayList_add(pub_list_by_topic,p);
 
+	double highest_score = -1;
+	pubsub_admin_service_pt best_psa = NULL;
+
 	for(i=0;i<arrayList_size(manager->psaList);i++){
 		pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
-		status += psa->addPublication(psa->admin,p);
+		double score;
+		psa->matchPublisher(psa->admin, p, &score);
+		if (score > highest_score){
+			highest_score = score;
+			best_psa = psa;
+		}
+	}
+	if (best_psa != NULL){
+		status += best_psa->addPublication(best_psa->admin,p);
 	}
 
 	celixThreadMutex_unlock(&manager->psaListLock);
@@ -727,9 +818,20 @@ celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpo
 
 		if(found && p !=NULL){
 
+			double highest_score = -1;
+			pubsub_admin_service_pt best_psa = NULL;
+
 			for(i=0;i<arrayList_size(manager->psaList);i++){
 				pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
-				status += psa->removePublication(psa->admin,p);
+				double score;
+				psa->matchPublisher(psa->admin, p, &score);
+				if (score > highest_score){
+					highest_score = score;
+					best_psa = psa;
+				}
+			}
+			if (best_psa != NULL){
+				status += best_psa->removePublication(best_psa->admin,p);
 			}
 
 			arrayList_removeElement(pub_list_by_topic,p);


Mime
View raw message