celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnol...@apache.org
Subject [2/3] celix git commit: Refactors the pubsub spi and fixes an issue with pubsub endpoint matching.
Date Tue, 20 Feb 2018 12:39:26 GMT
Refactors the pubsub spi and fixes an issue with pubsub endpoint matching.


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/b8f13870
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/b8f13870
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/b8f13870

Branch: refs/heads/develop
Commit: b8f138708b71850a4802396e6793257bc95f5314
Parents: c314016
Author: Pepijn Noltes <pepijnnoltes@gmail.com>
Authored: Tue Feb 20 13:30:36 2018 +0100
Committer: Pepijn Noltes <pepijnnoltes@gmail.com>
Committed: Tue Feb 20 13:30:36 2018 +0100

----------------------------------------------------------------------
 framework/include/service_reference.h           |   3 +
 framework/src/service_reference.c               |   9 +-
 pubsub/examples/CMakeLists.txt                  |  27 +-
 .../pubsub_admin_udp_mc/src/pubsub_admin_impl.c | 246 ++++++++----
 .../pubsub_admin_udp_mc/src/pubsub_admin_impl.h |  10 +-
 .../src/pubsub_psa_udpmc_constants.h            |  39 ++
 .../pubsub_admin_udp_mc/src/topic_publication.c |  51 ++-
 .../pubsub_admin_udp_mc/src/topic_publication.h |   4 +-
 pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c | 279 ++++++++++----
 pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h |  18 +-
 .../src/pubsub_psa_zmq_constants.h              |  48 +++
 pubsub/pubsub_admin_zmq/src/topic_publication.c |  37 +-
 pubsub/pubsub_admin_zmq/src/topic_publication.h |   2 +-
 .../pubsub_admin_zmq/src/topic_subscription.c   |  17 +-
 .../pubsub_admin_zmq/src/topic_subscription.h   |   2 +-
 pubsub/pubsub_api/include/pubsub/publisher.h    |   7 +-
 pubsub/pubsub_api/include/pubsub/subscriber.h   |   5 +-
 pubsub/pubsub_discovery/src/etcd_watcher.c      |  58 +--
 pubsub/pubsub_discovery/src/etcd_writer.c       |  54 ++-
 pubsub/pubsub_discovery/src/psd_activator.c     |   1 -
 .../src/pubsub_discovery_impl.c                 | 109 +++++-
 .../src/pubsub_discovery_impl.h                 |  11 +-
 .../pubsub_serializer_json/src/ps_activator.c   |   3 +-
 .../include/publisher_endpoint_announce.h       |   4 +
 pubsub/pubsub_spi/include/pubsub_admin.h        |  26 +-
 pubsub/pubsub_spi/include/pubsub_admin_match.h  |  17 +-
 pubsub/pubsub_spi/include/pubsub_constants.h    |  30 ++
 pubsub/pubsub_spi/include/pubsub_endpoint.h     |  46 ++-
 pubsub/pubsub_spi/include/pubsub_serializer.h   |   2 -
 pubsub/pubsub_spi/include/pubsub_utils.h        |   4 +-
 pubsub/pubsub_spi/src/pubsub_admin_match.c      | 383 ++++++-------------
 pubsub/pubsub_spi/src/pubsub_endpoint.c         | 204 +++++++---
 pubsub/pubsub_spi/src/pubsub_utils.c            | 100 ++---
 .../src/pubsub_topology_manager.c               |  93 +++--
 .../src/pubsub_topology_manager.h               |   5 +
 35 files changed, 1159 insertions(+), 795 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/framework/include/service_reference.h
----------------------------------------------------------------------
diff --git a/framework/include/service_reference.h b/framework/include/service_reference.h
index bb263f5..3d84526 100644
--- a/framework/include/service_reference.h
+++ b/framework/include/service_reference.h
@@ -48,6 +48,9 @@ FRAMEWORK_EXPORT celix_status_t
 serviceReference_getProperty(service_reference_pt reference, const char *key, const char **value);
 
 FRAMEWORK_EXPORT celix_status_t
+serviceReference_getPropertyWithDefault(service_reference_pt reference, const char *key, const char* def, const char **value);
+
+FRAMEWORK_EXPORT celix_status_t
 serviceReference_getPropertyKeys(service_reference_pt reference, char **keys[], unsigned int *size);
 
 FRAMEWORK_EXPORT celix_status_t

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/framework/src/service_reference.c
----------------------------------------------------------------------
diff --git a/framework/src/service_reference.c b/framework/src/service_reference.c
index 545426d..e39e912 100644
--- a/framework/src/service_reference.c
+++ b/framework/src/service_reference.c
@@ -203,14 +203,15 @@ celix_status_t serviceReference_getServiceRegistration(service_reference_pt ref,
     }
 }
 
-celix_status_t serviceReference_getProperty(service_reference_pt ref, const char* key, const char** value) {
+FRAMEWORK_EXPORT celix_status_t
+serviceReference_getPropertyWithDefault(service_reference_pt ref, const char *key, const char* def, const char **value) {
     celix_status_t status = CELIX_SUCCESS;
     properties_pt props = NULL;
     celixThreadRwlock_readLock(&ref->lock);
     if (ref->registration != NULL) {
         status = serviceRegistration_getProperties(ref->registration, &props);
         if (status == CELIX_SUCCESS) {
-            *value = (char*) properties_get(props, key);
+            *value = (char*) properties_getWithDefault(props, key, def);
         }
     } else {
         *value = NULL;
@@ -219,6 +220,10 @@ celix_status_t serviceReference_getProperty(service_reference_pt ref, const char
     return status;
 }
 
+celix_status_t serviceReference_getProperty(service_reference_pt ref, const char* key, const char** value) {
+    return serviceReference_getPropertyWithDefault(ref, key, NULL, value);
+}
+
 FRAMEWORK_EXPORT celix_status_t serviceReference_getPropertyKeys(service_reference_pt ref, char **keys[], unsigned int *size) {
     celix_status_t status = CELIX_SUCCESS;
     properties_pt props = NULL;

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/examples/CMakeLists.txt b/pubsub/examples/CMakeLists.txt
index 6703324..91f2efa 100644
--- a/pubsub/examples/CMakeLists.txt
+++ b/pubsub/examples/CMakeLists.txt
@@ -21,6 +21,12 @@ add_subdirectory(mp_pubsub)
 find_program(ETCD_CMD NAMES etcd)
 find_program(XTERM_CMD NAMES xterm)
 
+find_package(ZMQ REQUIRED)
+find_package(CZMQ REQUIRED)
+find_package(Jansson REQUIRED)
+
+set(PUBSUB_CONTAINER_LIBS ${JANSSON_LIBRARY} ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY} Celix::dfi)
+
 # UDP Multicast
 add_celix_container(pubsub_publisher_udp_mc
         GROUP pubsub
@@ -34,6 +40,7 @@ add_celix_container(pubsub_publisher_udp_mc
         celix_pubsub_poi_publisher
         celix_pubsub_poi_publisher2
         )
+target_link_libraries(pubsub_publisher_udp_mc PRIVATE ${PUBSUB_CONTAINER_LIBS})
 
 add_celix_container("pubsub_subscriber_udp_mc"
         GROUP "pubsub"
@@ -46,7 +53,7 @@ add_celix_container("pubsub_subscriber_udp_mc"
         Celix::pubsub_admin_udp_multicast
         celix_pubsub_poi_subscriber
         )
-
+target_link_libraries(pubsub_subscriber_udp_mc PRIVATE ${PUBSUB_CONTAINER_LIBS})
 add_celix_container("pubsub_subscriber2_udp_mc"
         GROUP "pubsub"
         BUNDLES
@@ -58,7 +65,7 @@ add_celix_container("pubsub_subscriber2_udp_mc"
         Celix::pubsub_admin_udp_multicast
         celix_pubsub_poi_subscriber
         )
-
+target_link_libraries(pubsub_subscriber2_udp_mc PRIVATE ${PUBSUB_CONTAINER_LIBS})
 if (ETCD_CMD AND XTERM_CMD)
     #Runtime starting a publish and subscriber for udp mc
     add_runtime(pubsub_rt_upd_mc
@@ -78,8 +85,8 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 
     # Dynamic ZMQ / UDP admin
     add_celix_container("pubsub_publisher"
-            GROUP "pubsub"
-            BUNDLES
+        GROUP "pubsub"
+        BUNDLES
             Celix::shell
             Celix::shell_tui
             Celix::pubsub_serializer_json
@@ -89,10 +96,11 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             Celix::pubsub_admin_udp_multicast
             celix_pubsub_poi_publisher
             celix_pubsub_poi_publisher2
-            PROPERTIES
+        PROPERTIES
             poi1.psa=zmq
             poi2.psa=udp
-            )
+    )
+    target_link_libraries(pubsub_publisher PRIVATE ${PUBSUB_CONTAINER_LIBS})
 
     add_celix_container("pubsub_subscriber"
             GROUP "pubsub"
@@ -109,6 +117,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             poi1.psa=zmq
             poi2.psa=udp
             )
+    target_link_libraries(pubsub_subscriber PRIVATE ${PUBSUB_CONTAINER_LIBS})
 
     # ZMQ
     add_celix_container("pubsub_zmq"
@@ -123,6 +132,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             celix_pubsub_poi_publisher
             celix_pubsub_poi_subscriber
             )
+    target_link_libraries(pubsub_zmq PRIVATE ${PUBSUB_CONTAINER_LIBS})
 
     add_celix_container("pubsub_publisher_zmq"
             GROUP "pubsub"
@@ -138,6 +148,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             PROPERTIES
             pubsub.scope=my_small_scope
             )
+    target_link_libraries(pubsub_publisher_zmq PRIVATE ${PUBSUB_CONTAINER_LIBS})
 
     add_celix_container("pubsub_subscriber_zmq"
             GROUP "pubsub"
@@ -150,6 +161,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             Celix::pubsub_admin_zmq
             celix_pubsub_poi_subscriber
             )
+    target_link_libraries(pubsub_subscriber_zmq PRIVATE ${PUBSUB_CONTAINER_LIBS})
 
     add_celix_container("pubsub_subscriber2_zmq"
             GROUP "pubsub"
@@ -163,6 +175,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             celix_pubsub_poi_subscriber
 
             )
+    target_link_libraries(pubsub_subscriber2_zmq PRIVATE ${PUBSUB_CONTAINER_LIBS})
 
     # ZMQ Multipart
     add_celix_container("pubsub_mp_subscriber_zmq"
@@ -176,6 +189,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             Celix::pubsub_admin_zmq
             org.apache.celix.pubsub_subscriber.MpSubscriber
             )
+    target_link_libraries(pubsub_mp_subscriber_zmq PRIVATE ${PUBSUB_CONTAINER_LIBS})
 
     add_celix_container("pubsub_mp_publisher_zmq"
             GROUP "pubsub"
@@ -188,6 +202,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
             Celix::pubsub_admin_zmq
             org.apache.celix.pubsub_publisher.MpPublisher
             )
+    target_link_libraries(pubsub_mp_publisher_zmq PRIVATE ${PUBSUB_CONTAINER_LIBS})
 
     if (ETCD_CMD AND XTERM_CMD)
         #Runtime starting two bundles using both zmq and upd mc pubsub

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
index a71344a..1e3cef0 100644
--- a/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
@@ -41,6 +41,7 @@
 #include <netinet/in.h>
 #include <arpa/inet.h>
 #include <netdb.h>
+#include <assert.h>
 
 #include "constants.h"
 #include "utils.h"
@@ -69,7 +70,7 @@ static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip)
 static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
 static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
 
-static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc);
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin, pubsub_endpoint_pt ep, pubsub_serializer_service_t **out, const char **serType);
 static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication);
 static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication);
 
@@ -206,6 +207,34 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad
 		(*admin)->mcIpAddress = strdup(DEFAULT_MC_IP);
 	}
 
+	(*admin)->defaultScore = PSA_UDPMC_DEFAULT_SCORE;
+	(*admin)->qosSampleScore = PSA_UDPMC_DEFAULT_QOS_SAMPLE_SCORE;
+	(*admin)->qosControlScore = PSA_UDPMC_DEFAULT_QOS_CONTROL_SCORE;
+
+	const char *defaultScoreStr = NULL;
+	const char *sampleScoreStr = NULL;
+	const char *controlScoreStr = NULL;
+	bundleContext_getProperty(context, PSA_UDPMC_DEFAULT_SCORE_KEY, &defaultScoreStr);
+	bundleContext_getProperty(context, PSA_UDPMC_QOS_SAMPLE_SCORE_KEY, &sampleScoreStr);
+	bundleContext_getProperty(context, PSA_UDPMC_QOS_CONTROL_SCORE_KEY, &controlScoreStr);
+
+	if (defaultScoreStr != NULL) {
+		(*admin)->defaultScore = strtof(defaultScoreStr, NULL);
+	}
+	if (sampleScoreStr != NULL) {
+		(*admin)->qosSampleScore = strtof(sampleScoreStr, NULL);
+	}
+	if (controlScoreStr != NULL) {
+		(*admin)->qosControlScore = strtof(controlScoreStr, NULL);
+	}
+
+	(*admin)->verbose = PSA_UDPMC_DEFAULT_VERBOSE;
+	const char *verboseStr = NULL;
+	bundleContext_getProperty(context, PSA_UDPMC_VERBOSE_KEY, &verboseStr);
+	if (verboseStr != NULL) {
+		(*admin)->verbose = strncasecmp("true", verboseStr, strlen("true")) == 0;
+	}
+
 	return status;
 }
 
@@ -307,12 +336,14 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
 
 		int i;
 		pubsub_serializer_service_t *best_serializer = NULL;
-		if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){
+		if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer, NULL)) == CELIX_SUCCESS){
 			status = pubsub_topicSubscriptionCreate(admin->bundle_context, admin->ifIpAddress, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, &any_sub);
 		}
 		else{
-			printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
-				   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+			if (admin->verbose) {
+				printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
+					   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+			}
 
 			celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 			arrayList_add(admin->noSerializerSubscriptions,subEP);
@@ -381,13 +412,7 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
 celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
 	celix_status_t status = CELIX_SUCCESS;
 
-	printf("PSA_UDP_MC: Received subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",
-		   properties_get(subEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
-		   subEP->serviceID,
-		   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
-		   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
-
-	if(strcmp(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),PUBSUB_ANY_SUB_TOPIC)==0){
+	if(strcmp(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),PUBSUB_ANY_SUB_TOPIC)==0){
 		return pubsubAdmin_addAnySubscription(admin,subEP);
 	}
 
@@ -397,26 +422,27 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 	celixThreadMutex_lock(&admin->localPublicationsLock);
 	celixThreadMutex_lock(&admin->externalPublicationsLock);
 
-	char* scope_topic = createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
 
 	service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
 	array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
 
-	if(factory==NULL && ext_pub_list==NULL){ //No (local or external) publishers yet for this topic
+	if (factory==NULL && ext_pub_list==NULL) { //No (local or external) publishers yet for this topic
 		pubsubAdmin_addSubscriptionToPendingList(admin,subEP);
-	}
-	else{
+	} else {
 		int i;
 		topic_subscription_pt subscription = hashMap_get(admin->subscriptions, scope_topic);
 
-		if(subscription == NULL) {
+		if (subscription == NULL) {
 			pubsub_serializer_service_t *best_serializer = NULL;
-			if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){
-				status += pubsub_topicSubscriptionCreate(admin->bundle_context,admin->ifIpAddress, (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), best_serializer, &subscription);
-			}
-			else{
-				printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
-					   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+            const char *serType = NULL;
+			if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer, &serType)) == CELIX_SUCCESS){
+				status += pubsub_topicSubscriptionCreate(admin->bundle_context,admin->ifIpAddress, (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME), best_serializer, &subscription);
+			} else {
+				if (admin->verbose) {
+					printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
+						   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+				}
 
 				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 				arrayList_add(admin->noSerializerSubscriptions,subEP);
@@ -477,6 +503,19 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 	celixThreadMutex_unlock(&admin->subscriptionsLock);
 	celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
 
+    if (admin->verbose) {
+        printf("PSA_UDPMC: Added subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+        printf("PSA_UDPMC: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+               properties_get(subEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+               properties_get(subEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_UDPMC: \t [endpoint url = %s]\n", properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    }
+
 	return status;
 
 }
@@ -484,13 +523,20 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
 	celix_status_t status = CELIX_SUCCESS;
 
-	printf("PSA_UDP_MC: Removing subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",
-		   properties_get(subEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
-		   subEP->serviceID,
-		   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
-		   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
-
-	char* scope_topic = createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+    if (admin->verbose) {
+        printf("PSA_UDPMC: Removing subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+        printf("PSA_UDPMC: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+               properties_get(subEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+               properties_get(subEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_UDPMC: \t [endpoint url = %s]\n", properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    }
+
+	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
 
 	celixThreadMutex_lock(&admin->subscriptionsLock);
 	topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
@@ -522,22 +568,29 @@ celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpo
 celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){
 	celix_status_t status = CELIX_SUCCESS;
 
-	printf("PSA_UDP_MC: Received publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",
-		   properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
-		   pubEP->serviceID,
-		   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
-		   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+    const char* fwUUID = NULL;
+    bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+    if(fwUUID==NULL){
+        printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n");
+        return CELIX_INVALID_BUNDLE_CONTEXT;
+    }
 
-	const char* fwUUID = NULL;
+    const char *epFwUUID = properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID);
+    bool isOwn = strncmp(fwUUID, epFwUUID, 128) == 0;
 
-	bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
-	if(fwUUID==NULL){
-		printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n");
-		return CELIX_INVALID_BUNDLE_CONTEXT;
-	}
-	char* scope_topic = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+    if (isOwn) {
+        //should be null, willl be set in this call
+        assert(properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY) == NULL);
+        assert(properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY) == NULL);
+    }
+
+    if (isOwn) {
+        properties_set(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY, PSA_UDPMC_PUBSUB_ADMIN_TYPE);
+    }
+
+	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
 
-	if ((strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), fwUUID) == 0) && (properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) == NULL)) {
+	if ((strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) && (properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) == NULL)) {
 
 		celixThreadMutex_lock(&admin->localPublicationsLock);
 
@@ -546,12 +599,15 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
 		if (factory == NULL) {
 			topic_publication_pt pub = NULL;
 			pubsub_serializer_service_t *best_serializer = NULL;
-			if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer)) == CELIX_SUCCESS){
-				status = pubsub_topicPublicationCreate(admin->sendSocket, pubEP, best_serializer, admin->mcIpAddress, &pub);
-			}
-			else{
+			const char* serType = NULL;
+			if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer, &serType)) == CELIX_SUCCESS){
+				status = pubsub_topicPublicationCreate(admin->sendSocket, pubEP, best_serializer, serType, admin->mcIpAddress, &pub);
+                if (isOwn) {
+                    properties_set(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY, serType);
+                }
+			} else {
 				printf("PSA_UDP_MC: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n",
-					   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+					   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
 
 				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 				arrayList_add(admin->noSerializerPublications,pubEP);
@@ -565,10 +621,10 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
 					connectTopicPubSubToSerializer(admin, best_serializer, pub, true);
 				}
 			} else {
-				printf("PSA_UDP_MC: Cannot create a topicPublication for scope=%s, topic=%s (bundle %ld).\n",
-					   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
-					   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
-					   pubEP->serviceID);
+				printf("PSA_UDP_MC: Cannot create a topicPublication for scope=%s, topic=%s (bundle %s).\n",
+					   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+					   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
+					   properties_get(pubEP->endpoint_props, PUBSUB_BUNDLE_ID));
 			}
 		} else {
 			//just add the new EP to the list
@@ -577,8 +633,7 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
 		}
 
 		celixThreadMutex_unlock(&admin->localPublicationsLock);
-	}
-	else{
+	} else {
 
 		celixThreadMutex_lock(&admin->externalPublicationsLock);
 		array_list_pt ext_pub_list = (array_list_pt) hashMap_get(admin->externalPublications, scope_topic);
@@ -630,6 +685,21 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
 
 	celixThreadMutex_unlock(&admin->subscriptionsLock);
 
+    if (admin->verbose) {
+        printf("PSA_UDPMC: Added publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+        printf("PSA_UDPMC: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+               properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+               properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_UDPMC: \t [endpoint url = %s, own = %i]\n",
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL),
+               isOwn);
+    }
+
 	return status;
 
 }
@@ -638,11 +708,18 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
 	celix_status_t status = CELIX_SUCCESS;
 	int count = 0;
 
-	printf("PSA_UDP_MC: Removing publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",
-		   properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
-		   pubEP->serviceID,
-		   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
-		   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+    if (admin->verbose) {
+        printf("PSA_UDPMC: Adding publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+        printf("PSA_UDPMC: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+               properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+               properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_UDPMC: \t [endpoint url = %s]\n", properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    }
 
 	const char* fwUUID = NULL;
 
@@ -651,9 +728,9 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
 		printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n");
 		return CELIX_INVALID_BUNDLE_CONTEXT;
 	}
-	char *scope_topic = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+	char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
 
-	if(strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0){
+	if(strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
 
 		celixThreadMutex_lock(&admin->localPublicationsLock);
 		service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
@@ -735,7 +812,7 @@ celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char *scop
 	printf("PSA_UDP_MC: Closing all publications for scope=%s,topic=%s\n", scope, topic);
 
 	celixThreadMutex_lock(&admin->localPublicationsLock);
-	char* scope_topic =createScopeTopicKey(scope, topic);
+	char* scope_topic = pubsubEndpoint_createScopeTopicKey(scope, topic);
 	hash_map_entry_pt pubsvc_entry = (hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic);
 	if(pubsvc_entry!=NULL){
 		char* key = (char*)hashMapEntry_getKey(pubsvc_entry);
@@ -761,7 +838,7 @@ celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char *sco
 	printf("PSA_UDP_MC: Closing all subscriptions\n");
 
 	celixThreadMutex_lock(&admin->subscriptionsLock);
-	char* scope_topic =createScopeTopicKey(scope, topic);
+	char* scope_topic = pubsubEndpoint_createScopeTopicKey(scope, topic);
 	hash_map_entry_pt sub_entry = (hash_map_entry_pt)hashMap_getEntry(admin->subscriptions,scope_topic);
 	if(sub_entry!=NULL){
 		char* topic = (char*)hashMapEntry_getKey(sub_entry);
@@ -818,7 +895,7 @@ static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip)
 static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
 	celix_status_t status = CELIX_SUCCESS;
 
-	char* scope_topic =createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
 	array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic);
 	if(pendingListPerTopic==NULL){
 		arrayList_create(&pendingListPerTopic);
@@ -841,7 +918,7 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r
 	const char *serType = NULL;
 	serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
 	if(serType == NULL){
-		printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference);
+		printf("PSA_UDPMC: Serializer serviceReference %p has no %s property specified\n",reference, PUBSUB_SERIALIZER_TYPE_KEY);
 		return CELIX_SERVICE_EXCEPTION;
 	}
 
@@ -856,7 +933,7 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r
 	for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){
 		pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i);
 		pubsub_serializer_service_t *best_serializer = NULL;
-		pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
+		pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, NULL);
 		if(best_serializer != NULL){ /* Finally we have a valid serializer! */
 			pubsubAdmin_addSubscription(admin, ep);
 		}
@@ -865,7 +942,7 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r
 	for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){
 		pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i);
 		pubsub_serializer_service_t *best_serializer = NULL;
-		pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
+		pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, NULL);
 		if(best_serializer != NULL){ /* Finally we have a valid serializer! */
 			pubsubAdmin_addPublication(admin, ep);
 		}
@@ -873,7 +950,9 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r
 
 	celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 
-	printf("PSA_UDP_MC: %s serializer added\n",serType);
+	if (admin->verbose) {
+		printf("PSA_UDP_MC: %s serializer added\n", serType);
+	}
 
 	return status;
 }
@@ -886,7 +965,7 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt
 
 	serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
 	if(serType == NULL){
-		printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference);
+		printf("Serializer serviceReference %p has no %s property specified\n",reference, PUBSUB_SERIALIZER_TYPE_KEY);
 		return CELIX_SERVICE_EXCEPTION;
 	}
 
@@ -998,7 +1077,9 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt
 		arrayList_destroy(topicSubList);
 	}
 
-	printf("PSA_UDP_MC: %s serializer removed\n",serType);
+	if (admin->verbose) {
+		printf("PSA_UDP_MC: %s serializer removed\n", serType);
+	}
 
 
 	return CELIX_SUCCESS;
@@ -1007,24 +1088,41 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt
 celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score){
 	celix_status_t status = CELIX_SUCCESS;
 
+    const char *fwUuid = NULL;
+    bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUuid);
+    if (fwUuid == NULL) {
+        return CELIX_ILLEGAL_STATE;
+    }
+
 	celixThreadMutex_lock(&admin->serializerListLock);
-	status = pubsub_admin_match(endpoint->topic_props,PUBSUB_ADMIN_TYPE,admin->serializerList,score);
+	status = pubsub_admin_match(endpoint, PSA_UDPMC_PUBSUB_ADMIN_TYPE, fwUuid, admin->qosSampleScore, admin->qosControlScore, admin->defaultScore, admin->serializerList,score);
 	celixThreadMutex_unlock(&admin->serializerListLock);
 
 	return status;
 }
 
 /* This one recall the same logic as in the match function */
-static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc){
-
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin, pubsub_endpoint_pt ep, pubsub_serializer_service_t **out, const char **serType){
 	celix_status_t status = CELIX_SUCCESS;
 
+	pubsub_serializer_service_t *serSvc = NULL;
+	service_reference_pt svcRef = NULL;
+
 	celixThreadMutex_lock(&admin->serializerListLock);
-	status = pubsub_admin_get_best_serializer(ep->topic_props, admin->serializerList, serSvc);
+	status = pubsub_admin_get_best_serializer(ep->topic_props, admin->serializerList, &svcRef);
 	celixThreadMutex_unlock(&admin->serializerListLock);
 
-	return status;
+	if (svcRef != NULL) {
+		bundleContext_getService(admin->bundle_context, svcRef, (void**)&serSvc);
+		bundleContext_ungetService(admin->bundle_context, svcRef, NULL); //TODO, FIXME this should not be done this way. only unget if the service is not used any more
+        if (serType != NULL) {
+            serviceReference_getProperty(svcRef, PUBSUB_SERIALIZER_TYPE_KEY, serType);
+        }
+	}
 
+	*out = serSvc;
+
+	return status;
 }
 
 static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication){

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h b/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
index de4b813..3529a8f 100644
--- a/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
+++ b/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
@@ -27,11 +27,10 @@
 #ifndef PUBSUB_ADMIN_UDP_MC_IMPL_H_
 #define PUBSUB_ADMIN_UDP_MC_IMPL_H_
 
+#include "pubsub_psa_udpmc_constants.h"
 #include "pubsub_admin.h"
 #include "log_helper.h"
 
-#define PUBSUB_ADMIN_TYPE	"udp_mc"
-
 struct pubsub_admin {
 
 	bundle_context_pt bundle_context;
@@ -68,8 +67,13 @@ struct pubsub_admin {
 	char* mcIpAddress; // The multicast IP address
 
 	int sendSocket;
-	void* zmq_context; // to be removed
 
+
+	double qosSampleScore;
+	double qosControlScore;
+	double defaultScore;
+
+	bool verbose;
 };
 
 celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin);

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h b/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h
new file mode 100644
index 0000000..2a02da8
--- /dev/null
+++ b/pubsub/pubsub_admin_udp_mc/src/pubsub_psa_udpmc_constants.h
@@ -0,0 +1,39 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef PUBSUB_PSA_UDPMC_CONSTANTS_H_
+#define PUBSUB_PSA_UDPMC_CONSTANTS_H_
+
+
+#define PSA_UDPMC_PUBSUB_ADMIN_TYPE	            "udp_mc"
+
+#define PSA_UDPMC_DEFAULT_QOS_SAMPLE_SCORE 		70
+#define PSA_UDPMC_DEFAULT_QOS_CONTROL_SCORE 	30
+#define PSA_UDPMC_DEFAULT_SCORE 				50
+
+#define PSA_UDPMC_QOS_SAMPLE_SCORE_KEY 			"PSA_UDPMC_QOS_SAMPLE_SCORE"
+#define PSA_UDPMC_QOS_CONTROL_SCORE_KEY 		"PSA_UDPMC_QOS_CONTROL_SCORE"
+#define PSA_UDPMC_DEFAULT_SCORE_KEY 			"PSA_UDPMC_DEFAULT_SCORE"
+
+#define PSA_UDPMC_DEFAULT_VERBOSE 				false
+
+#define PSA_UDPMC_VERBOSE_KEY    			    "PSA_UDPMC_VERBOSE"
+
+
+#endif /* PUBSUB_PSA_UDPMC_CONSTANTS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/src/topic_publication.c b/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
index 3aa2c30..7e9bdbb 100644
--- a/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
+++ b/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
@@ -40,6 +40,7 @@
 #include "large_udp.h"
 
 #include "pubsub_serializer.h"
+#include "pubsub_psa_udpmc_constants.h"
 
 #define EP_ADDRESS_LEN		32
 
@@ -52,7 +53,10 @@ struct topic_publication {
 	array_list_pt pub_ep_list; //List<pubsub_endpoint>
 	hash_map_pt boundServices; //<bundle_pt,bound_service>
 	celix_thread_mutex_t tp_lock;
-	pubsub_serializer_service_t *serializer;
+	struct {
+		const char* type;
+		pubsub_serializer_service_t* svc;
+	} serializer;
 	struct sockaddr_in destAddr;
 };
 
@@ -92,11 +96,14 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsig
 static void delay_first_send_for_late_joiners(void);
 
 
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, const char* best_serializer_type, char* bindIP, topic_publication_pt *out){
 
 	char* ep = malloc(EP_ADDRESS_LEN);
 	memset(ep,0,EP_ADDRESS_LEN);
-	unsigned int port = pubEP->serviceID + rand_range(UDP_BASE_PORT+pubEP->serviceID+3, UDP_MAX_PORT);
+
+	long serviceId =strtol(properties_getWithDefault(pubEP->endpoint_props, PUBSUB_ENDPOINT_SERVICE_ID, "0"), NULL, 10);
+
+	unsigned int port = serviceId + rand_range(UDP_BASE_PORT+serviceId+3, UDP_MAX_PORT);
 	snprintf(ep,EP_ADDRESS_LEN,"udp://%s:%u",bindIP,port);
 
 
@@ -112,9 +119,10 @@ celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt
 	pub->destAddr.sin_addr.s_addr = inet_addr(bindIP);
 	pub->destAddr.sin_port = htons(port);
 
-	pub->serializer = best_serializer;
+	pub->serializer.type = best_serializer_type;
+	pub->serializer.svc = best_serializer;
 
-	pubsub_topicPublicationAddPublisherEP(pub,pubEP);
+	pubsub_topicPublicationAddPublisherEP(pub, pubEP);
 
 	*out = pub;
 
@@ -138,7 +146,8 @@ celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
 	hashMap_destroy(pub->boundServices,false,false);
 
 	pub->svcFactoryReg = NULL;
-	pub->serializer = NULL;
+	pub->serializer.svc= NULL;
+	pub->serializer.type= NULL;
 
 	if(close(pub->sendSocket) != 0){
 		status = CELIX_FILE_IO_EXCEPTION;
@@ -167,8 +176,8 @@ celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,top
 		factory->ungetService = pubsub_topicPublicationUngetService;
 
 		properties_pt props = properties_create();
-		properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE));
-		properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+		properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE));
+		properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
                 properties_set(props,"service.version", PUBSUB_PUBLISHER_SERVICE_VERSION);
 
 
@@ -176,10 +185,10 @@ celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,top
 
 		if(status != CELIX_SUCCESS){
 			properties_destroy(props);
-			printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot register ServiceFactory for topic %s, topic %s (bundle %ld).\n",
-				   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
-				   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
-				   pubEP->serviceID);
+			printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot register ServiceFactory for topic %s, topic %s (bundle %s).\n",
+				   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+				   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
+				   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_BUNDLE_ID));
 		}
 		else{
 			*svcFactory = factory;
@@ -197,17 +206,19 @@ celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
 	return serviceRegistration_unregister(pub->svcFactoryReg);
 }
 
-celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){
+celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, pubsub_endpoint_pt ep) {
 
 	celixThreadMutex_lock(&(pub->tp_lock));
 	pubsubEndpoint_setField(ep, PUBSUB_ENDPOINT_URL, pub->endpoint);
+	pubsubEndpoint_setField(ep, PUBSUB_ADMIN_TYPE_KEY, PSA_UDPMC_PUBSUB_ADMIN_TYPE);
+	pubsubEndpoint_setField(ep, PUBSUB_SERIALIZER_TYPE_KEY, pub->serializer.type);
 	arrayList_add(pub->pub_ep_list,ep);
 	celixThreadMutex_unlock(&(pub->tp_lock));
 
 	return CELIX_SUCCESS;
 }
 
-celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){
+celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep) {
 
 	celixThreadMutex_lock(&(pub->tp_lock));
 	arrayList_removeElement(pub->pub_ep_list,ep);
@@ -386,13 +397,13 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
 		bound->getCount = 1;
 		celixThreadMutex_create(&bound->mp_lock,NULL);
 
-		if(tp->serializer != NULL){
-			tp->serializer->createSerializerMap(tp->serializer->handle,bundle,&bound->msgTypes);
+		if (tp->serializer.svc != NULL){
+			tp->serializer.svc->createSerializerMap(tp->serializer.svc->handle,bundle,&bound->msgTypes);
 		}
 
 		pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
-		bound->scope=strdup(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE));
-		bound->topic=strdup(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+		bound->scope=strdup(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE));
+		bound->topic=strdup(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
 		bound->largeUdpHandle = largeUdp_create(1);
 
 		bound->service.handle = bound;
@@ -409,8 +420,8 @@ static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service
 
 	celixThreadMutex_lock(&boundSvc->mp_lock);
 
-	if(boundSvc->parent->serializer != NULL && boundSvc->msgTypes != NULL){
-		boundSvc->parent->serializer->destroySerializerMap(boundSvc->parent->serializer->handle, boundSvc->msgTypes);
+	if(boundSvc->parent->serializer.svc != NULL && boundSvc->msgTypes != NULL){
+		boundSvc->parent->serializer.svc->destroySerializerMap(boundSvc->parent->serializer.svc->handle, boundSvc->msgTypes);
 	}
 
 	if(boundSvc->scope!=NULL){

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_admin_udp_mc/src/topic_publication.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/src/topic_publication.h b/pubsub/pubsub_admin_udp_mc/src/topic_publication.h
index 8f47deb..e0a5698 100644
--- a/pubsub/pubsub_admin_udp_mc/src/topic_publication.h
+++ b/pubsub/pubsub_admin_udp_mc/src/topic_publication.h
@@ -43,10 +43,10 @@ typedef struct pubsub_udp_msg {
 } pubsub_udp_msg_t;
 
 typedef struct topic_publication *topic_publication_pt;
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, const char* best_serializer_type, char* bindIP, topic_publication_pt *out);
 celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
 
-celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
+celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, pubsub_endpoint_pt ep);
 celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
 
 celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
index 9929437..1451d92 100644
--- a/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
@@ -70,7 +70,7 @@ static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip);
 static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
 static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
 
-static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc);
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **svcOut, const char **serTypeOut);
 static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication);
 static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication);
 
@@ -165,8 +165,6 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad
 			(*admin)->maxPort = PSA_ZMQ_DEFAULT_MAX_PORT;
 		}
 
-		printf("PSA Using base port %u to max port %u\n", (*admin)->basePort, (*admin)->maxPort);
-
 		// Disable Signal Handling by CZMQ
 		setenv("ZSYS_SIGHANDLER", "false", true);
 
@@ -201,6 +199,39 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad
 
 	}
 
+
+    (*admin)->defaultScore = PSA_ZMQ_DEFAULT_SCORE;
+    (*admin)->qosSampleScore = PSA_ZMQ_DEFAULT_QOS_SAMPLE_SCORE;
+    (*admin)->qosControlScore = PSA_ZMQ_DEFAULT_QOS_CONTROL_SCORE;
+
+    const char *defaultScoreStr = NULL;
+    const char *sampleScoreStr = NULL;
+    const char *controlScoreStr = NULL;
+    bundleContext_getProperty(context, PSA_ZMQ_DEFAULT_SCORE_KEY, &defaultScoreStr);
+    bundleContext_getProperty(context, PSA_ZMQ_QOS_SAMPLE_SCORE_KEY, &sampleScoreStr);
+    bundleContext_getProperty(context, PSA_ZMQ_QOS_CONTROL_SCORE_KEY, &controlScoreStr);
+
+    if (defaultScoreStr != NULL) {
+        (*admin)->defaultScore = strtof(defaultScoreStr, NULL);
+    }
+    if (sampleScoreStr != NULL) {
+        (*admin)->qosSampleScore = strtof(sampleScoreStr, NULL);
+    }
+    if (controlScoreStr != NULL) {
+        (*admin)->qosControlScore = strtof(controlScoreStr, NULL);
+    }
+
+    (*admin)->verbose = PSA_ZMQ_DEFAULT_VERBOSE;
+    const char *verboseStr = NULL;
+    bundleContext_getProperty(context, PSA_ZMQ_VERBOSE_KEY, &verboseStr);
+    if (verboseStr != NULL) {
+        (*admin)->verbose = strncasecmp("true", verboseStr, strlen("true")) == 0;
+    }
+
+    if ((*admin)->verbose) {
+        printf("PSA ZMQ Using base port %u to max port %u\n", (*admin)->basePort, (*admin)->maxPort);
+    }
+
 	return status;
 }
 
@@ -302,18 +333,19 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
 
 	celixThreadMutex_lock(&admin->subscriptionsLock);
 
-	topic_subscription_pt any_sub = hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
+	topic_subscription_pt any_sub = hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
 
 	if(any_sub==NULL){
 
 		int i;
 		pubsub_serializer_service_t *best_serializer = NULL;
-		if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){
-			status = pubsub_topicSubscriptionCreate(admin->bundle_context, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, &any_sub);
+        const char *serType = NULL;
+		if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer, &serType)) == CELIX_SUCCESS){
+			status = pubsub_topicSubscriptionCreate(admin->bundle_context, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, serType, &any_sub);
 		}
 		else{
 			printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
-				   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+				   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
 			celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 			arrayList_add(admin->noSerializerSubscriptions,subEP);
 			celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
@@ -378,16 +410,15 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
 	return status;
 }
 
+/**
+ * A subcriber service is registered and this PSA had won the match
+ * (based on qos or other meta data)
+ * Will update the pubsub endpoint with the chosen pubsub admin and serializer type
+ */
 celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
 	celix_status_t status = CELIX_SUCCESS;
 
-	printf("PSA_ZMQ: Received subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",
-		   properties_get(subEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
-		   subEP->serviceID,
-		   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
-		   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
-
-	if(strcmp(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),PUBSUB_ANY_SUB_TOPIC)==0){
+	if(strcmp(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME), PUBSUB_ANY_SUB_TOPIC)==0) {
 		return pubsubAdmin_addAnySubscription(admin,subEP);
 	}
 
@@ -397,7 +428,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 	celixThreadMutex_lock(&admin->localPublicationsLock);
 	celixThreadMutex_lock(&admin->externalPublicationsLock);
 
-	char* scope_topic = createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
 
 	service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
 	array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
@@ -411,12 +442,13 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 
 		if(subscription == NULL) {
 			pubsub_serializer_service_t *best_serializer = NULL;
-			if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){
-				status += pubsub_topicSubscriptionCreate(admin->bundle_context, (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), best_serializer, &subscription);
+            const char *serType = NULL;
+			if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer, &serType)) == CELIX_SUCCESS){
+				status += pubsub_topicSubscriptionCreate(admin->bundle_context, (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME), best_serializer, serType, &subscription);
 			}
 			else{
 				printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
-					   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+					   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
 				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 				arrayList_add(admin->noSerializerSubscriptions,subEP);
 				celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
@@ -476,6 +508,20 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 	celixThreadMutex_unlock(&admin->subscriptionsLock);
 	celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
 
+
+    if (admin->verbose) {
+        printf("PSA_ZMQ: Added subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+        printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+               properties_get(subEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+               properties_get(subEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_ZMQ: \t [endpoint url = %s]\n", properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    }
+
 	return status;
 
 }
@@ -483,12 +529,20 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
 	celix_status_t status = CELIX_SUCCESS;
 
-	printf("PSA_ZMQ: Removing subscription [FWUUID=%s bundleID=%ld topic=%s]\n",
-		   properties_get(subEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
-		   subEP->serviceID,
-		   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
-
-	char* scope_topic = createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+    if (admin->verbose) {
+        printf("PSA_ZMQ: Removing subscription [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+        printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+               properties_get(subEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+               properties_get(subEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
+               properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_ZMQ: \t [endpoint url = %s]\n", properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    }
+
+	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
 
 	celixThreadMutex_lock(&admin->subscriptionsLock);
 	topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
@@ -517,26 +571,45 @@ celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpo
 
 }
 
+
+/**
+ * A bundle has shown interested in a publisher service and this PSA had won the match
+ * based on filter or embedded topic.properties (extender pattern)
+ * OR !!
+ * A remote publication has been discovered and forwarded to this call
+ * Will update the pubsub endpoint with the chosen pubsub admin and serializer type
+ */
 celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP) {
-	celix_status_t status = CELIX_SUCCESS;
+    celix_status_t status = CELIX_SUCCESS;
 
-	printf("PSA_ZMQ: Received publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",
-		   properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
-		   pubEP->serviceID,
-		   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
-		   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+    const char *fwUUID = NULL;
+    bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
+    if (fwUUID == NULL) {
+        printf("PSA_ZMQ: Cannot retrieve fwUUID.\n");
+        return CELIX_INVALID_BUNDLE_CONTEXT;
+    }
 
-	const char* fwUUID = NULL;
+    const char *epFwUUID = properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID);
+    bool isOwn = strncmp(fwUUID, epFwUUID, 128) == 0;
 
-	bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
-	if (fwUUID == NULL) {
-		printf("PSA_ZMQ: Cannot retrieve fwUUID.\n");
-		return CELIX_INVALID_BUNDLE_CONTEXT;
-	}
+    if (isOwn) {
+        //should be null, willl be set in this call
+        assert(properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY) == NULL);
+        assert(properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY) == NULL);
+    } else {
+        //inverse ADMIN_TYPE_KEY and SERIALIZER_TYPE shoudl not be null
+        assert(properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY) != NULL);
+        assert(properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY) != NULL);
+    }
+
+    if (isOwn) {
+        properties_set(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY, PSA_ZMQ_PUBSUB_ADMIN_TYPE);
+    }
 
-	char *scope_topic = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 
-	if ((strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), fwUUID) == 0) &&
+	char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+
+	if ((strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) &&
 			(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) == NULL)) {
 
 		celixThreadMutex_lock(&admin->localPublicationsLock);
@@ -546,12 +619,16 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint
 		if (factory == NULL) {
 			topic_publication_pt pub = NULL;
 			pubsub_serializer_service_t *best_serializer = NULL;
-			if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer)) == CELIX_SUCCESS){
-				status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, best_serializer, admin->ipAddress, admin->basePort, admin->maxPort, &pub);
+            const char *serType = NULL;
+			if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer, &serType)) == CELIX_SUCCESS){
+				status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, best_serializer, serType, admin->ipAddress, admin->basePort, admin->maxPort, &pub);
+                if (isOwn) {
+                    properties_set(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY, serType);
+                }
 			}
-			else{
+			else {
 				printf("PSA_ZMQ: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n",
-					   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+					   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
 				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 				arrayList_add(admin->noSerializerPublications,pubEP);
 				celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
@@ -564,10 +641,10 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint
 					connectTopicPubSubToSerializer(admin, best_serializer, pub, true);
 				}
 			} else {
-				printf("PSA_ZMQ: Cannot create a topicPublication for scope=%s, topic=%s (bundle %ld).\n",
-					   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
-					   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
-					   pubEP->serviceID);
+				printf("PSA_ZMQ: Cannot create a topicPublication for scope=%s, topic=%s (bundle %s).\n",
+					   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+					   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
+					   properties_get(pubEP->endpoint_props, PUBSUB_BUNDLE_ID));
 			}
 		} else {
 			//just add the new EP to the list
@@ -629,7 +706,24 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint
 
 	celixThreadMutex_unlock(&admin->subscriptionsLock);
 
-	return status;
+
+    if (admin->verbose) {
+        printf("PSA_ZMQ: Added publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+        printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+               properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+               properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_UDPMC: \t [endpoint url = %s, own = %i]\n",
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL),
+               isOwn);
+    }
+
+
+    return status;
 
 }
 
@@ -637,21 +731,29 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
 	celix_status_t status = CELIX_SUCCESS;
 	int count = 0;
 
-	printf("PSA_ZMQ: Removing publication [FWUUID=%s bundleID=%ld topic=%s]\n",
-		   properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
-		   pubEP->serviceID,
-		   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+    if (admin->verbose) {
+        printf("PSA_ZMQ: Removing publication [FWUUID=%s endpointUUID=%s scope=%s, topic=%s]\n",
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+        printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint type = %s]\n",
+               properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+               properties_get(pubEP->endpoint_props, PUBSUB_SERIALIZER_TYPE_KEY),
+               properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_ZMQ: \t [endpoint url = %s]\n", properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    }
 
 	const char* fwUUID = NULL;
 
 	bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
 	if(fwUUID==NULL){
-		printf("PSA_ZMQ: Cannot retrieve fwUUID.\n");
+		fprintf(stderr, "ERROR PSA_ZMQ: Cannot retrieve fwUUID.\n");
 		return CELIX_INVALID_BUNDLE_CONTEXT;
 	}
-	char *scope_topic = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+	char *scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
 
-	if(strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0){
+	if(strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
 
 		celixThreadMutex_lock(&admin->localPublicationsLock);
 		service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
@@ -729,10 +831,12 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
 celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin, char *scope, char* topic){
 	celix_status_t status = CELIX_SUCCESS;
 
-	printf("PSA_ZMQ: Closing all publications\n");
+    if (admin->verbose) {
+        printf("PSA_ZMQ: Closing all publications\n");
+    }
 
 	celixThreadMutex_lock(&admin->localPublicationsLock);
-	char *scope_topic = createScopeTopicKey(scope, topic);
+	char *scope_topic = pubsubEndpoint_createScopeTopicKey(scope, topic);
 	hash_map_entry_pt pubsvc_entry = (hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic);
 	if(pubsvc_entry!=NULL){
 		char* key = (char*)hashMapEntry_getKey(pubsvc_entry);
@@ -755,10 +859,12 @@ celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin, char *sco
 celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope,char* topic){
 	celix_status_t status = CELIX_SUCCESS;
 
-	printf("PSA_ZMQ: Closing all subscriptions\n");
+    if (admin->verbose) {
+        printf("PSA_ZMQ: Closing all subscriptions\n");
+    }
 
 	celixThreadMutex_lock(&admin->subscriptionsLock);
-	char *scope_topic = createScopeTopicKey(scope, topic);
+	char *scope_topic = pubsubEndpoint_createScopeTopicKey(scope, topic);
 	hash_map_entry_pt sub_entry = (hash_map_entry_pt)hashMap_getEntry(admin->subscriptions,scope_topic);
 	if(sub_entry!=NULL){
 		char* topic = (char*)hashMapEntry_getKey(sub_entry);
@@ -814,7 +920,7 @@ static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip)
 
 static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
 	celix_status_t status = CELIX_SUCCESS;
-	char* scope_topic = createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+	char* scope_topic = pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
 	array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic);
 	if(pendingListPerTopic==NULL){
 		arrayList_create(&pendingListPerTopic);
@@ -834,8 +940,8 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r
 
 	const char *serType = NULL;
 	serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
-	if(serType == NULL){
-		printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference);
+	if (serType == NULL) {
+		fprintf(stderr, "WARNING PSA ZMQ: Serializer serviceReference %p has no %s property specified\n", reference, PUBSUB_SERIALIZER_TYPE_KEY);
 		return CELIX_SERVICE_EXCEPTION;
 	}
 
@@ -850,7 +956,7 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r
 	for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){
 		pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i);
 		pubsub_serializer_service_t *best_serializer = NULL;
-		pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
+		pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, NULL);
 		if(best_serializer != NULL){ /* Finally we have a valid serializer! */
 			pubsubAdmin_addSubscription(admin, ep);
 		}
@@ -859,7 +965,7 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r
 	for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){
 		pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i);
 		pubsub_serializer_service_t *best_serializer = NULL;
-		pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
+		pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, NULL);
 		if(best_serializer != NULL){ /* Finally we have a valid serializer! */
 			pubsubAdmin_addPublication(admin, ep);
 		}
@@ -867,7 +973,9 @@ celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt r
 
 	celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 
-	printf("PSA_ZMQ: %s serializer added\n",serType);
+    if (admin->verbose) {
+        printf("PSA_ZMQ: %s serializer added\n", serType);
+    }
 
 	return status;
 }
@@ -879,8 +987,8 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt
 	const char *serType = NULL;
 
 	serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
-	if(serType == NULL){
-		printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference);
+	if (serType == NULL) {
+		printf("WARNING PSA ZMQ: Serializer serviceReference %p has no %s property specified\n", reference, PUBSUB_SERIALIZER_TYPE_KEY);
 		return CELIX_SERVICE_EXCEPTION;
 	}
 
@@ -994,9 +1102,9 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt
 	}
 
 
-
-	printf("PSA_ZMQ: %s serializer removed\n",serType);
-
+    if (admin->verbose) {
+        printf("PSA_ZMQ: %s serializer removed\n", serType);
+    }
 
 	return CELIX_SUCCESS;
 }
@@ -1004,24 +1112,42 @@ celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt
 celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score){
 	celix_status_t status = CELIX_SUCCESS;
 
+    const char *fwUuid = NULL;
+    bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUuid);
+    if (fwUuid == NULL) {
+        return CELIX_ILLEGAL_STATE;
+    }
+
 	celixThreadMutex_lock(&admin->serializerListLock);
-	status = pubsub_admin_match(endpoint->topic_props,PUBSUB_ADMIN_TYPE,admin->serializerList,score);
+	status = pubsub_admin_match(endpoint, PSA_ZMQ_PUBSUB_ADMIN_TYPE, fwUuid, admin->qosSampleScore, admin->qosControlScore, admin->defaultScore, admin->serializerList, score);
 	celixThreadMutex_unlock(&admin->serializerListLock);
 
 	return status;
 }
 
 /* This one recall the same logic as in the match function */
-static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc){
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **svcOut, const char **serTypeOut) {
+    celix_status_t status = CELIX_SUCCESS;
 
-	celix_status_t status = CELIX_SUCCESS;
+    pubsub_serializer_service_t *serSvc = NULL;
+    service_reference_pt svcRef = NULL;
 
-	celixThreadMutex_lock(&admin->serializerListLock);
-	status = pubsub_admin_get_best_serializer(ep->topic_props, admin->serializerList, serSvc);
-	celixThreadMutex_unlock(&admin->serializerListLock);
+    celixThreadMutex_lock(&admin->serializerListLock);
+    status = pubsub_admin_get_best_serializer(ep->topic_props, admin->serializerList, &svcRef);
+    celixThreadMutex_unlock(&admin->serializerListLock);
+
+    if (svcRef != NULL) {
+        bundleContext_getService(admin->bundle_context, svcRef, (void**)&serSvc);
+        bundleContext_ungetService(admin->bundle_context, svcRef, NULL); //TODO, FIXME this should not be done this way. only unget if the service is not used any more
+        if (serTypeOut != NULL) {
+            serviceReference_getProperty(svcRef, PUBSUB_SERIALIZER_TYPE_KEY, serTypeOut);
+        }
+    }
 
-	return status;
 
+    *svcOut = serSvc;
+
+    return status;
 }
 
 static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication){
@@ -1057,3 +1183,4 @@ static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topi
 	celixThreadMutex_unlock(&admin->usedSerializersLock);
 
 }
+

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h b/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
index 040a0d3..c788382 100644
--- a/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
+++ b/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.h
@@ -37,18 +37,12 @@
 #undef LOG_INFO
 #undef LOG_WARNING
 
+#include "pubsub_psa_zmq_constants.h"
 #include "pubsub_admin.h"
 #include "pubsub_admin_match.h"
 #include "log_helper.h"
 #include "command.h"
 
-#define PSA_ZMQ_BASE_PORT "PSA_ZMQ_BASE_PORT"
-#define PSA_ZMQ_MAX_PORT "PSA_ZMQ_MAX_PORT"
-
-#define PSA_ZMQ_DEFAULT_BASE_PORT 5501
-#define PSA_ZMQ_DEFAULT_MAX_PORT 6000
-
-#define PUBSUB_ADMIN_TYPE	"zmq"
 
 struct pubsub_admin {
 
@@ -56,8 +50,8 @@ struct pubsub_admin {
 	log_helper_pt loghelper;
 
 	/* List of the available serializers */
-	celix_thread_mutex_t serializerListLock; // List<serializers>
-	array_list_pt serializerList;
+	celix_thread_mutex_t serializerListLock;
+	array_list_pt serializerList; // List<serializers service references>
 
 	celix_thread_mutex_t localPublicationsLock;
 	hash_map_pt localPublications;//<topic(string),service_factory_pt>
@@ -91,6 +85,12 @@ struct pubsub_admin {
 
 	command_service_t shellCmdService;
 	service_registration_pt  shellCmdReg;
+
+	double qosSampleScore;
+	double qosControlScore;
+	double defaultScore;
+
+	bool verbose;
 };
 
 celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin);

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h b/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
new file mode 100644
index 0000000..211439e
--- /dev/null
+++ b/pubsub/pubsub_admin_zmq/src/pubsub_psa_zmq_constants.h
@@ -0,0 +1,48 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+
+#ifndef PUBSUB_PSA_ZMQ_CONSTANTS_H_
+#define PUBSUB_PSA_ZMQ_CONSTANTS_H_
+
+
+
+#define PSA_ZMQ_PUBSUB_ADMIN_TYPE	            "zmq"
+
+#define PSA_ZMQ_BASE_PORT                       "PSA_ZMQ_BASE_PORT"
+#define PSA_ZMQ_MAX_PORT                        "PSA_ZMQ_MAX_PORT"
+
+#define PSA_ZMQ_DEFAULT_BASE_PORT               5501
+#define PSA_ZMQ_DEFAULT_MAX_PORT                6000
+
+#define PSA_ZMQ_DEFAULT_QOS_SAMPLE_SCORE 	    30
+#define PSA_ZMQ_DEFAULT_QOS_CONTROL_SCORE 	    70
+#define PSA_ZMQ_DEFAULT_SCORE 				    30
+
+#define PSA_ZMQ_QOS_SAMPLE_SCORE_KEY 		    "PSA_ZMQ_QOS_SAMPLE_SCORE"
+#define PSA_ZMQ_QOS_CONTROL_SCORE_KEY 		    "PSA_ZMQ_QOS_CONTROL_SCORE"
+#define PSA_ZMQ_DEFAULT_SCORE_KEY 			    "PSA_ZMQ_DEFAULT_SCORE"
+
+#define PSA_ZMQ_DEFAULT_VERBOSE 			    false
+#define PSA_ZMQ_VERBOSE_KEY		 			    "PSA_ZMQ_VERBOSE"
+
+
+
+#endif /* PUBSUB_PSA_ZMQ_CONSTANTS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_admin_zmq/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/src/topic_publication.c b/pubsub/pubsub_admin_zmq/src/topic_publication.c
index 873cec2..c81107d 100644
--- a/pubsub/pubsub_admin_zmq/src/topic_publication.c
+++ b/pubsub/pubsub_admin_zmq/src/topic_publication.c
@@ -45,6 +45,7 @@
 #include "topic_publication.h"
 
 #include "pubsub_serializer.h"
+#include "pubsub_psa_zmq_constants.h"
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
 	#include "zmq_crypto.h"
@@ -65,7 +66,10 @@ struct topic_publication {
 	service_registration_pt svcFactoryReg;
 	array_list_pt pub_ep_list; //List<pubsub_endpoint>
 	hash_map_pt boundServices; //<bundle_pt,bound_service>
-	pubsub_serializer_service_t *serializer;
+	struct {
+		const char* type;
+		pubsub_serializer_service_t *svc;
+	} serializer;
 	celix_thread_mutex_t tp_lock;
 };
 
@@ -109,7 +113,7 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsig
 
 static void delay_first_send_for_late_joiners(void);
 
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, const char* serType, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
 	celix_status_t status = CELIX_SUCCESS;
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
@@ -214,7 +218,8 @@ celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, p
 
 	pub->endpoint = ep;
 	pub->zmq_socket = socket;
-	pub->serializer = best_serializer;
+	pub->serializer.svc = best_serializer;
+	pub->serializer.type = serType;
 
 	celixThreadMutex_create(&(pub->socket_lock),NULL);
 
@@ -248,7 +253,8 @@ celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
 	hashMap_destroy(pub->boundServices,false,false);
 
 	pub->svcFactoryReg = NULL;
-	pub->serializer = NULL;
+	pub->serializer.svc = NULL;
+	pub->serializer.type = NULL;
 #ifdef BUILD_WITH_ZMQ_SECURITY
 	zcert_destroy(&(pub->zmq_cert));
 #endif
@@ -282,16 +288,17 @@ celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,top
 		factory->ungetService = pubsub_topicPublicationUngetService;
 
 		properties_pt props = properties_create();
-		properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
-		properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE));
+		properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
+		properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE));
 		properties_set(props,"service.version", PUBSUB_PUBLISHER_SERVICE_VERSION);
 
 		status = bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg));
 
 		if(status != CELIX_SUCCESS){
 			properties_destroy(props);
-			printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot register ServiceFactory for topic %s (bundle %ld).\n",
-				   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),pubEP->serviceID);
+			printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot register ServiceFactory for topic %s (bundle %s).\n",
+				   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME),
+				   properties_get(pubEP->endpoint_props, PUBSUB_BUNDLE_ID));
 		}
 		else{
 			*svcFactory = factory;
@@ -309,9 +316,11 @@ celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
 	return serviceRegistration_unregister(pub->svcFactoryReg);
 }
 
-celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){
+celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, pubsub_endpoint_pt ep) {
 
 	celixThreadMutex_lock(&(pub->tp_lock));
+	pubsubEndpoint_setField(ep, PUBSUB_ADMIN_TYPE_KEY, PSA_ZMQ_PUBSUB_ADMIN_TYPE);
+	pubsubEndpoint_setField(ep, PUBSUB_SERIALIZER_TYPE_KEY, pub->serializer.type);
     pubsubEndpoint_setField(ep, PUBSUB_ENDPOINT_URL, pub->endpoint);
 	arrayList_add(pub->pub_ep_list,ep);
 	celixThreadMutex_unlock(&(pub->tp_lock));
@@ -574,14 +583,14 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
 		bound->mp_send_in_progress = false;
 		celixThreadMutex_create(&bound->mp_lock,NULL);
 
-		if(tp->serializer != NULL){
-			tp->serializer->createSerializerMap(tp->serializer->handle,bundle,&bound->msgTypes);
+		if(tp->serializer.svc != NULL){
+			tp->serializer.svc->createSerializerMap(tp->serializer.svc->handle,bundle,&bound->msgTypes);
 		}
 
 		arrayList_create(&bound->mp_parts);
 
 		pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
-		bound->topic=strdup(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
+		bound->topic=strdup(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME));
 
 		bound->service.handle = bound;
 		bound->service.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID;
@@ -600,8 +609,8 @@ static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service
 	celixThreadMutex_lock(&boundSvc->mp_lock);
 
 
-	if(boundSvc->parent->serializer != NULL && boundSvc->msgTypes != NULL){
-		boundSvc->parent->serializer->destroySerializerMap(boundSvc->parent->serializer->handle, boundSvc->msgTypes);
+	if(boundSvc->parent->serializer.svc != NULL && boundSvc->msgTypes != NULL){
+		boundSvc->parent->serializer.svc->destroySerializerMap(boundSvc->parent->serializer.svc->handle, boundSvc->msgTypes);
 	}
 
 	if(boundSvc->mp_parts!=NULL){

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_admin_zmq/src/topic_publication.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/src/topic_publication.h b/pubsub/pubsub_admin_zmq/src/topic_publication.h
index 65df0e3..20e4a8e 100644
--- a/pubsub/pubsub_admin_zmq/src/topic_publication.h
+++ b/pubsub/pubsub_admin_zmq/src/topic_publication.h
@@ -35,7 +35,7 @@
 
 typedef struct topic_publication *topic_publication_pt;
 
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, const char *serType, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out);
 celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
 
 celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_admin_zmq/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/src/topic_subscription.c b/pubsub/pubsub_admin_zmq/src/topic_subscription.c
index 8ff94d0..46a1688 100644
--- a/pubsub/pubsub_admin_zmq/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_zmq/src/topic_subscription.c
@@ -75,7 +75,10 @@ struct topic_subscription{
 	celix_thread_mutex_t ts_lock;
 	bundle_context_pt context;
 
-	pubsub_serializer_service_t *serializer;
+	struct {
+		const char* type;
+		pubsub_serializer_service_t *svc;
+	} serializer;
 
 	hash_map_pt servicesMap; // key = service, value = msg types map
 
@@ -134,7 +137,7 @@ static unsigned int get_zmq_receive_timeout(bundle_context_pt context) {
 	return timeout;
 }
 
-celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, char* scope, char* topic, pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out){
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, char* scope, char* topic, pubsub_serializer_service_t *best_serializer, const char *serType, topic_subscription_pt* out){
 	celix_status_t status = CELIX_SUCCESS;
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
@@ -202,7 +205,7 @@ celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,
 	ts->zmq_socket = zmq_s;
 	ts->running = false;
 	ts->nrSubscribers = 0;
-	ts->serializer = best_serializer;
+	ts->serializer.svc = best_serializer;
 	ts->zmqReceiveTimeout = get_zmq_receive_timeout(bundle_context);
 #ifdef BUILD_WITH_ZMQ_SECURITY
 	ts->zmq_cert = sub_cert;
@@ -419,8 +422,8 @@ static celix_status_t topicsub_subscriberTracked(void * handle, service_referenc
 
 		serviceReference_getBundle(reference, &bundle);
 
-		if(ts->serializer != NULL && bundle!=NULL){
-			ts->serializer->createSerializerMap(ts->serializer->handle,bundle,&msgTypes);
+		if(ts->serializer.svc != NULL && bundle!=NULL){
+			ts->serializer.svc->createSerializerMap(ts->serializer.svc->handle,bundle,&msgTypes);
 			if(msgTypes != NULL){
 				hashMap_put(ts->servicesMap, service, msgTypes);
 				printf("PSA_ZMQ_TS: New subscriber registered.\n");
@@ -443,8 +446,8 @@ static celix_status_t topicsub_subscriberUntracked(void * handle, service_refere
 	celixThreadMutex_lock(&ts->ts_lock);
 	if (hashMap_containsKey(ts->servicesMap, service)) {
 		hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service);
-		if(msgTypes!=NULL && ts->serializer!=NULL){
-			ts->serializer->destroySerializerMap(ts->serializer->handle,msgTypes);
+		if(msgTypes!=NULL && ts->serializer.svc!=NULL){
+			ts->serializer.svc->destroySerializerMap(ts->serializer.svc->handle,msgTypes);
 			printf("PSA_ZMQ_TS: Subscriber unregistered.\n");
 		}
 		else{

http://git-wip-us.apache.org/repos/asf/celix/blob/b8f13870/pubsub/pubsub_admin_zmq/src/topic_subscription.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/src/topic_subscription.h b/pubsub/pubsub_admin_zmq/src/topic_subscription.h
index 7267103..6dca4e5 100644
--- a/pubsub/pubsub_admin_zmq/src/topic_subscription.h
+++ b/pubsub/pubsub_admin_zmq/src/topic_subscription.h
@@ -38,7 +38,7 @@
 
 typedef struct topic_subscription* topic_subscription_pt;
 
-celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,char* scope, char* topic, pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out);
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,char* scope, char* topic, pubsub_serializer_service_t *best_serializer, const char* serType, topic_subscription_pt* out);
 celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
 celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
 celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);


Mime
View raw message