celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnol...@apache.org
Subject celix git commit: CELIX-408: Refactoring in the pubsub serializer usage. The topology manger does not have to started before the pubsub admins anymore
Date Wed, 12 Apr 2017 11:29:52 GMT
Repository: celix
Updated Branches:
  refs/heads/feature/CELIX-408_runtime 7efe4331a -> 2d9c77d53


CELIX-408: Refactoring in the pubsub serializer usage. The topology manger does not have to started before the pubsub admins anymore

Known issues:
 - The serializer how to be started before the admins, to prevent issue when using a admin without serializer. Note admin should only register itself if a serializer is available.
 - Still considerd instable


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/2d9c77d5
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/2d9c77d5
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/2d9c77d5

Branch: refs/heads/feature/CELIX-408_runtime
Commit: 2d9c77d530184d5d3d119cd3f92202e332996c78
Parents: 7efe433
Author: Pepijn Noltes <pepijnnoltes@gmail.com>
Authored: Wed Apr 12 13:27:09 2017 +0200
Committer: Pepijn Noltes <pepijnnoltes@gmail.com>
Committed: Wed Apr 12 13:27:09 2017 +0200

----------------------------------------------------------------------
 pubsub/deploy/CMakeLists.txt                    |  50 ++++----
 .../include/pubsub_publish_service_private.h    |   4 +-
 .../private/src/pubsub_admin_impl.c             |   5 +-
 .../private/src/topic_publication.c             | 104 +++++++++-------
 .../private/src/topic_subscription.c            |  57 ++++++---
 .../include/pubsub_publish_service_private.h    |   4 +-
 .../private/include/topic_subscription.h        |   2 +-
 .../private/src/pubsub_admin_impl.c             |   7 +-
 .../private/src/topic_publication.c             | 121 +++++++++++--------
 .../private/src/topic_subscription.c            | 102 ++++++++++------
 .../private/src/pubsub_serializer_impl.c        |   9 +-
 .../private/src/pubsub_topology_manager.c       |  11 +-
 pubsub/test/CMakeLists.txt                      |  14 +--
 pubsub/test/msg_descriptors/msg.descriptor      |   3 +-
 pubsub/test/test/msg.h                          |   4 +-
 pubsub/test/test/sut_activator.c                |   5 +-
 pubsub/test/test/tst_activator.cpp              |  13 +-
 17 files changed, 322 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/deploy/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/deploy/CMakeLists.txt b/pubsub/deploy/CMakeLists.txt
index 52fecc9..85b49d2 100644
--- a/pubsub/deploy/CMakeLists.txt
+++ b/pubsub/deploy/CMakeLists.txt
@@ -24,12 +24,12 @@ add_deploy("pubsub_publisher_udp_mc"
     BUNDLES
        shell
        shell_tui
-       org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+		org.apache.celix.pubsub_serializer.PubSubSerializerJson
+		org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
        org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
        org.apache.celix.pubsub_admin.PubSubAdminUdpMc
        org.apache.celix.pubsub_publisher.PoiPublisher
        org.apache.celix.pubsub_publisher.PoiPublisher2
-       org.apache.celix.pubsub_serializer.PubSubSerializerJson
 )
 
 add_deploy("pubsub_subscriber_udp_mc" 
@@ -37,11 +37,11 @@ add_deploy("pubsub_subscriber_udp_mc"
     BUNDLES
        shell
        shell_tui
-       org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+		org.apache.celix.pubsub_serializer.PubSubSerializerJson
+		org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
        org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
        org.apache.celix.pubsub_admin.PubSubAdminUdpMc
        org.apache.celix.pubsub_subscriber.PoiSubscriber
-       org.apache.celix.pubsub_serializer.PubSubSerializerJson
 )
 
 add_deploy("pubsub_subscriber2_udp_mc" 
@@ -49,11 +49,11 @@ add_deploy("pubsub_subscriber2_udp_mc"
     BUNDLES
        shell
        shell_tui
-       org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+		org.apache.celix.pubsub_serializer.PubSubSerializerJson
+		org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
        org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
        org.apache.celix.pubsub_admin.PubSubAdminUdpMc
        org.apache.celix.pubsub_subscriber.PoiSubscriber
-       org.apache.celix.pubsub_serializer.PubSubSerializerJson
 )
 
 if (ETCD_CMD AND XTERM_CMD)
@@ -81,13 +81,13 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 	    BUNDLES
 	       shell
 	       shell_tui
-	       org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+			org.apache.celix.pubsub_serializer.PubSubSerializerJson
+			org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
 	       org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
 	       org.apache.celix.pubsub_admin.PubSubAdminZmq
 	       org.apache.celix.pubsub_admin.PubSubAdminUdpMc
 	       org.apache.celix.pubsub_publisher.PoiPublisher
 	       org.apache.celix.pubsub_publisher.PoiPublisher2
-	       org.apache.celix.pubsub_serializer.PubSubSerializerJson
 	    PROPERTIES
 	       poi1.psa=zmq
 	       poi2.psa=udp
@@ -98,12 +98,12 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 	    BUNDLES
 	       shell
 	       shell_tui
-	       org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+			org.apache.celix.pubsub_serializer.PubSubSerializerJson
+			org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
 	       org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
 	       org.apache.celix.pubsub_admin.PubSubAdminZmq
 	       org.apache.celix.pubsub_admin.PubSubAdminUdpMc
 	       org.apache.celix.pubsub_subscriber.PoiSubscriber
-	       org.apache.celix.pubsub_serializer.PubSubSerializerJson
 	    PROPERTIES
 	       poi1.psa=zmq
 	       poi2.psa=udp
@@ -115,12 +115,12 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 	    BUNDLES
 	       shell
 	       shell_tui
-	       org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+			org.apache.celix.pubsub_serializer.PubSubSerializerJson
+			org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
 	       org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
 	       org.apache.celix.pubsub_admin.PubSubAdminZmq
 	       org.apache.celix.pubsub_publisher.PoiPublisher
 	       org.apache.celix.pubsub_subscriber.PoiSubscriber
-	       org.apache.celix.pubsub_serializer.PubSubSerializerJson
 	)
 
 	add_deploy("pubsub_publisher_zmq"
@@ -128,12 +128,12 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 	    BUNDLES
 	       shell
 	       shell_tui
-	       org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+			org.apache.celix.pubsub_serializer.PubSubSerializerJson
+			org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
 	       org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
 	       org.apache.celix.pubsub_admin.PubSubAdminZmq
 	       org.apache.celix.pubsub_publisher.PoiPublisher
 	       org.apache.celix.pubsub_publisher.PoiPublisher2
-	       org.apache.celix.pubsub_serializer.PubSubSerializerJson
 	   	PROPERTIES
 		    pubsub.scope=my_small_scope
 	)
@@ -143,11 +143,11 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 	    BUNDLES
 	       shell
 	       shell_tui
+			org.apache.celix.pubsub_serializer.PubSubSerializerJson
 	       org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
 	       org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
 	       org.apache.celix.pubsub_admin.PubSubAdminZmq
 	       org.apache.celix.pubsub_subscriber.PoiSubscriber
-	       org.apache.celix.pubsub_serializer.PubSubSerializerJson
 	)
 
 	add_deploy("pubsub_subscriber2_zmq"
@@ -155,11 +155,12 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 	    BUNDLES
 	       shell
 	       shell_tui
+			org.apache.celix.pubsub_serializer.PubSubSerializerJson
 	       org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
 	       org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
 	       org.apache.celix.pubsub_admin.PubSubAdminZmq
 	       org.apache.celix.pubsub_subscriber.PoiSubscriber
-	       org.apache.celix.pubsub_serializer.PubSubSerializerJson
+
 	)
 
 	# ZMQ Multipart
@@ -168,23 +169,23 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 	    BUNDLES
 	       shell
 	       shell_tui
+			org.apache.celix.pubsub_serializer.PubSubSerializerJson
 	       org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
 	       org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
 	       org.apache.celix.pubsub_admin.PubSubAdminZmq
 	       org.apache.celix.pubsub_subscriber.MpSubscriber
-	       org.apache.celix.pubsub_serializer.PubSubSerializerJson
 	)
 
 	add_deploy("pubsub_mp_publisher_zmq"
 	    GROUP "pubsub"
 	    BUNDLES
-	       shell
-	       shell_tui
-	       org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
-	       org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
-	       org.apache.celix.pubsub_admin.PubSubAdminZmq
-	       org.apache.celix.pubsub_publisher.MpPublisher
-	       org.apache.celix.pubsub_serializer.PubSubSerializerJson
+	   		shell
+	   		shell_tui
+	   		org.apache.celix.pubsub_serializer.PubSubSerializerJson
+			org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+	   		org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
+	   		org.apache.celix.pubsub_admin.PubSubAdminZmq
+	   		org.apache.celix.pubsub_publisher.MpPublisher
 	)
 
 	if (ETCD_CMD AND XTERM_CMD)
@@ -207,6 +208,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 			GROUP pubsub
 			DEPLOYMENTS
 				pubsub_publisher
+				pubsub_subscriber_zmq
 				pubsub_subscriber2_zmq
 			COMMANDS
 				etcd

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
index 57d942a..b43fb08 100644
--- a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
+++ b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
@@ -42,13 +42,13 @@ typedef struct pubsub_udp_msg {
 } pubsub_udp_msg_t;
 
 typedef struct topic_publication *topic_publication_pt;
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, char* bindIP, topic_publication_pt *out);
 celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
 
 celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
 celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
 
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
 celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
 
 celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
index ebfe3e6..3693970 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
@@ -431,7 +431,8 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
 
 		if (factory == NULL) {
 			topic_publication_pt pub = NULL;
-			status = pubsub_topicPublicationCreate(admin->sendSocket, pubEP, admin->serializerSvc, admin->mcIpAddress,&pub);
+			status = pubsub_topicPublicationCreate(admin->sendSocket, pubEP, admin->mcIpAddress,&pub);
+			pubsub_topicPublicationSetSerializer(pub, admin->serializerSvc);
 			if(status == CELIX_SUCCESS){
 				status = pubsub_topicPublicationStart(admin->bundle_context,pub,&factory);
 				if(status==CELIX_SUCCESS && factory !=NULL){
@@ -655,7 +656,7 @@ celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serialize
 	while(hashMapIterator_hasNext(lp_iter)){
 		service_factory_pt factory = (service_factory_pt) hashMapIterator_nextValue(lp_iter);
 		topic_publication_pt topic_pub = (topic_publication_pt) factory->handle;
-		pubsub_topicPublicationAddSerializer(topic_pub, admin->serializerSvc);
+		pubsub_topicPublicationSetSerializer(topic_pub, admin->serializerSvc);
 	}
 	hashMapIterator_destroy(lp_iter);
 	celixThreadMutex_unlock(&admin->localPublicationsLock);

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
index be0a433..227761b 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
@@ -63,7 +63,7 @@ struct topic_publication {
 
 typedef struct publish_bundle_bound_service {
 	topic_publication_pt parent;
-	pubsub_publisher_pt service;
+	pubsub_publisher_t pubSvc;
 	bundle_pt bundle;
     char *scope;
 	char *topic;
@@ -97,7 +97,7 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsig
 static void delay_first_send_for_late_joiners(void);
 
 
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, char* bindIP, topic_publication_pt *out){
 
     char* ep = malloc(EP_ADDRESS_LEN);
     memset(ep,0,EP_ADDRESS_LEN);
@@ -116,7 +116,7 @@ celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt
 	pub->destAddr.sin_family = AF_INET;
 	pub->destAddr.sin_addr.s_addr = inet_addr(bindIP);
 	pub->destAddr.sin_port = htons(port);
-	pub->serializerSvc = serializer;
+	pub->serializerSvc = NULL;
 
 	pubsub_topicPublicationAddPublisherEP(pub,pubEP);
 
@@ -222,30 +222,36 @@ celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub
 	return CELIX_SUCCESS;
 }
 
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc){
+celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc){
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&(pub->tp_lock));
 
     //clear old serializer
     if (pub->serializerSvc != NULL) {
-        hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); //key = bundle , value = svc
+        hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); //key = bundle_pt, publish_bundle_bound_service_t*
         while (hashMapIterator_hasNext(&iter)) {
             publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter);
+            celixThreadMutex_lock(&bound->mp_lock);
 			pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map);
+            celixThreadMutex_unlock(&bound->mp_lock);
 			bound->map = NULL;
         }
     }
 
     //setup new serializer
 	pub->serializerSvc = serializerSvc;
-	hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
-	while (hashMapIterator_hasNext(&iter)) {
-		hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
-		bundle_pt bundle = hashMapEntry_getKey(entry);
-		publish_bundle_bound_service_t* bound = hashMapEntry_getValue(entry);
-		pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, &bound->map);
-	}
+    if (pub->serializerSvc != NULL) {
+        hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+            bundle_pt bundle = hashMapEntry_getKey(entry);
+            publish_bundle_bound_service_t *bound = hashMapEntry_getValue(entry);
+            celixThreadMutex_lock(&bound->mp_lock);
+            pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, &bound->map);
+            celixThreadMutex_unlock(&bound->mp_lock);
+        }
+    }
 
 	celixThreadMutex_unlock(&(pub->tp_lock));
 
@@ -260,7 +266,9 @@ celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub,
         hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
         while (hashMapIterator_hasNext(&iter)) {
             publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter);
+            celixThreadMutex_lock(&bound->mp_lock);
             pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map);
+            celixThreadMutex_unlock(&bound->mp_lock);
 			bound->map = NULL;
         }
     }
@@ -294,7 +302,7 @@ static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt
 	}
 
 	if (bound != NULL) {
-		*service = bound->service;
+		*service = &bound->pubSvc;
 	}
 
 	celixThreadMutex_unlock(&(publish->tp_lock));
@@ -374,13 +382,21 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, con
 
     if (bound->map == NULL) {
         printf("TP: Serializer is not set!\n");
+        status = 1;
     } else if (msgSer == NULL ){
         printf("TP: No msg serializer available for msg type id %d\n", msgTypeId);
+        hash_map_iterator_t iter = hashMapIterator_construct(bound->map->serializers);
+        printf("Note supported messages:\n");
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_msg_serializer_t *msgSer = hashMapIterator_nextValue(&iter);
+            printf("\tmsg %s with id %d\n", msgSer->msgName, msgSer->msgId);
+        }
+        status = 1;
     }
 
     int major=0, minor=0;
 
-    if (msgSer != NULL) {
+    if (status == 0 && msgSer != NULL) {
 		pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
 		strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
 		msg_hdr->type = msgTypeId;
@@ -407,9 +423,6 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, con
 		free(msg);
 		free(serializedOutput);
 
-    } else {
-        printf("TP: Message %u not supported.\n",msgTypeId);
-        status=-1;
     }
 
     celixThreadMutex_unlock(&(bound->mp_lock));
@@ -418,9 +431,30 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, con
     return status;
 }
 
-static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){
-	*msgTypeId = utils_stringHash(msgType);
-	return 0;
+static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* out){
+    publish_bundle_bound_service_t* bound = handle;
+    unsigned int msgTypeId = 0;
+
+    celixThreadMutex_lock(&bound->mp_lock);
+    if (bound->map != NULL) {
+        hash_map_iterator_t iter = hashMapIterator_construct(bound->map->serializers);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter);
+            if (strncmp(msgType, msgSer->msgName, 1024*1024) == 0) {
+                msgTypeId = msgSer->msgId;
+                break;
+            }
+        }
+    }
+    celixThreadMutex_unlock(&bound->mp_lock);
+
+    if (msgTypeId != 0) {
+        *out = msgTypeId;
+        return 0;
+    } else {
+        printf("TP: Cannot find msg type id for msg type %s\n", msgType);
+        return 1;
+    }
 }
 
 
@@ -432,15 +466,10 @@ static unsigned int rand_range(unsigned int min, unsigned int max){
 }
 
 static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle) {
-
-	publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound));
+    //PRECOND lock on publish->tp_lock
+    publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound));
 
 	if (bound != NULL) {
-		bound->service = calloc(1, sizeof(*bound->service));
-	}
-
-	if (bound != NULL && bound->service != NULL) {
-
 		bound->parent = tp;
 		bound->bundle = bundle;
 		bound->getCount = 1;
@@ -452,21 +481,17 @@ static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(to
 		bound->scope=strdup(pubEP->scope);
 		bound->topic=strdup(pubEP->topic);
 		bound->largeUdpHandle = largeUdp_create(1);
-		bound->service->handle = bound;
-		bound->service->localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID;
-		bound->service->send = pubsub_topicPublicationSend;
-		bound->service->sendMultipart = NULL;  //Multipart not supported (jet) for UDP
+		bound->pubSvc.handle = bound;
+		bound->pubSvc.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID;
+		bound->pubSvc.send = pubsub_topicPublicationSend;
+		bound->pubSvc.sendMultipart = NULL;  //Multipart not supported (jet) for UDP
 
-        //TODO check if lock on tp is needed? (e.g. is lock already done by caller?)
-		if (tp->serializerSvc != NULL) {
+    	if (tp->serializerSvc != NULL) {
             tp->serializerSvc->createSerializerMap(tp->serializerSvc->handle, bundle, &bound->map);
 		}
 	}
 	else
 	{
-		if (bound != NULL) {
-			free(bound->service);
-		}
 		free(bound);
 		return NULL;
 	}
@@ -475,14 +500,9 @@ static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(to
 }
 
 static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc) {
-
+    //PRECOND lock on publish->tp_lock
 	celixThreadMutex_lock(&boundSvc->mp_lock);
 
-	if (boundSvc->service != NULL) {
-		free(boundSvc->service);
-	}
-
-    //TODO check if lock on parent is needed, e.g. does the caller already lock?
     if (boundSvc->map != NULL) {
         if (boundSvc->parent->serializerSvc == NULL) {
             printf("TP: Cannot destroy pubsub msg serializer map. No serliazer service\n");

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
index da23b21..a424112 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
@@ -29,7 +29,6 @@
 #include <unistd.h>
 #include <signal.h>
 
-#include <sys/types.h>
 #include <sys/socket.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>
@@ -57,7 +56,7 @@
 #define UDP_BUFFER_SIZE         65535
 #define MAX_UDP_SESSIONS        16
 
-struct topic_subscription{
+struct topic_subscription {
 
 	char* ifIpAddress;
 	service_tracker_pt tracker;
@@ -70,7 +69,8 @@ struct topic_subscription{
 
     //NOTE. using a service ptr can be dangerous, because pointer can be reused.
     //ensuring that pointer are removed before new (refurbish) pionter comes along is crucial!
-	hash_map_pt msgSerializersMap; // key = service ptr, value = pubsub_msg_serializer_map_t*
+	hash_map_pt msgSerializerMapMap; // key = service ptr, value = pubsub_msg_serializer_map_t*
+    hash_map_pt bundleMap; //key = service ptr, value = bundle_pt
 
 	hash_map_pt socketMap; // key = URL, value = listen-socket
 	unsigned int nrSubscribers;
@@ -118,7 +118,8 @@ celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundl
 
 	celixThreadMutex_create(&ts->ts_lock,NULL);
 	arrayList_create(&ts->sub_ep_list);
-	ts->msgSerializersMap = hashMap_create(NULL, NULL, NULL, NULL);
+	ts->msgSerializerMapMap = hashMap_create(NULL, NULL, NULL, NULL);
+    ts->bundleMap = hashMap_create(NULL, NULL, NULL, NULL);
 	ts->socketMap =  hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
 	ts->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS);
@@ -164,7 +165,8 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
 	serviceTracker_destroy(ts->tracker);
 	arrayList_clear(ts->sub_ep_list);
 	arrayList_destroy(ts->sub_ep_list);
-	hashMap_destroy(ts->msgSerializersMap,false,false);
+	hashMap_destroy(ts->msgSerializerMapMap, false, false);
+    hashMap_destroy(ts->bundleMap, false, false);
 
 	hashMap_destroy(ts->socketMap,false,false);
 	largeUdp_destroy(ts->largeUdpHandle);
@@ -394,13 +396,34 @@ unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
 	return ts->nrSubscribers;
 }
 
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){
+celix_status_t pubsub_topicSubscriptionSetSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&ts->ts_lock);
+    //clear old
+    if (ts->serializerSvc != NULL) {
+        hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+            pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry);
+            pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
+            ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
+            hashMap_put(ts->msgSerializerMapMap, subsvc, NULL);
 
+        }
+    }
 	ts->serializerSvc = serializerSvc;
-
+    //init new
+    if (ts->serializerSvc != NULL) {
+        hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_subscriber_t* subsvc = hashMapIterator_nextKey(&iter);
+            bundle_pt bundle = hashMap_get(ts->bundleMap, subsvc);
+            pubsub_msg_serializer_map_t* map = NULL;
+            ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map);
+            hashMap_put(ts->msgSerializerMapMap, subsvc, map);
+        }
+    }
 	celixThreadMutex_unlock(&ts->ts_lock);
 
 	return status;
@@ -411,10 +434,13 @@ celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts
 
 	celixThreadMutex_lock(&ts->ts_lock);
 	if (ts->serializerSvc == serializerSvc) { //only act if svc removed is services used
-		hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializersMap);
+		hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap);
 		while (hashMapIterator_hasNext(&iter)) {
-            pubsub_msg_serializer_map_t* map = hashMapIterator_nextValue(&iter);
+            hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+            pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry);
+            pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
             ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
+            hashMap_put(ts->msgSerializerMapMap, subsvc, NULL);
 		}
 		ts->serializerSvc = NULL;
 	}
@@ -428,7 +454,7 @@ static celix_status_t topicsub_subscriberTracked(void * handle, service_referenc
 	topic_subscription_pt ts = handle;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-	if (!hashMap_containsKey(ts->msgSerializersMap, svc)) {
+	if (!hashMap_containsKey(ts->msgSerializerMapMap, svc)) {
 		bundle_pt bundle = NULL;
 		serviceReference_getBundle(reference, &bundle);
 
@@ -436,7 +462,8 @@ static celix_status_t topicsub_subscriberTracked(void * handle, service_referenc
             pubsub_msg_serializer_map_t* map = NULL;
             ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map);
             if (map != NULL) {
-                hashMap_put(ts->msgSerializersMap, svc, map);
+                hashMap_put(ts->msgSerializerMapMap, svc, map);
+                hashMap_put(ts->bundleMap, svc, bundle);
             }
 		}
 	}
@@ -452,10 +479,12 @@ static celix_status_t topicsub_subscriberUntracked(void * handle, service_refere
 
 
     celixThreadMutex_lock(&ts->ts_lock);
-	if (hashMap_containsKey(ts->msgSerializersMap, svc)) {
-		pubsub_msg_serializer_map_t* map = hashMap_remove(ts->msgSerializersMap, svc);
+	if (hashMap_containsKey(ts->msgSerializerMapMap, svc)) {
+		pubsub_msg_serializer_map_t* map = hashMap_remove(ts->msgSerializerMapMap, svc);
 		if (ts->serializerSvc != NULL){
 			ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
+            hashMap_remove(ts->bundleMap, svc);
+            hashMap_remove(ts->msgSerializerMapMap, svc);
 		}
 	}
 	celixThreadMutex_unlock(&ts->ts_lock);
@@ -467,7 +496,7 @@ static celix_status_t topicsub_subscriberUntracked(void * handle, service_refere
 
 static void process_msg(topic_subscription_pt sub, pubsub_udp_msg_t* msg){
 
-	hash_map_iterator_t iter = hashMapIterator_construct(sub->msgSerializersMap);
+	hash_map_iterator_t iter = hashMapIterator_construct(sub->msgSerializerMapMap);
 	celixThreadMutex_lock(&sub->ts_lock);
 	while (hashMapIterator_hasNext(&iter)) {
 		hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h b/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
index 158dfe7..dbd2ff1 100644
--- a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
+++ b/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
@@ -34,13 +34,13 @@
 
 typedef struct topic_publication *topic_publication_pt;
 
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out);
 celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
 
 celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
 celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
 
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
 celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
 
 celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
index 1fbbaaf..c1e78c3 100644
--- a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
+++ b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
@@ -51,7 +51,7 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
 celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
 celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
 
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicSubscriptionSetSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc);
 celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc);
 
 celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription);

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
index 5c9a5d5..6095d8a 100644
--- a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
@@ -444,7 +444,8 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint
 
         if (factory == NULL) {
             topic_publication_pt pub = NULL;
-            status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, admin->serializerSvc, admin->ipAddress, admin->basePort, admin->maxPort, &pub);
+            status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, admin->ipAddress, admin->basePort, admin->maxPort, &pub);
+			pubsub_topicPublicationSetSerializer(pub, admin->serializerSvc);
             if (status == CELIX_SUCCESS) {
                 status = pubsub_topicPublicationStart(admin->bundle_context, pub, &factory);
                 if (status == CELIX_SUCCESS && factory != NULL) {
@@ -676,7 +677,7 @@ celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serialize
 	while(hashMapIterator_hasNext(lp_iter)){
 		service_factory_pt factory = (service_factory_pt) hashMapIterator_nextValue(lp_iter);
 		topic_publication_pt topic_pub = (topic_publication_pt) factory->handle;
-		pubsub_topicPublicationAddSerializer(topic_pub, admin->serializerSvc);
+		pubsub_topicPublicationSetSerializer(topic_pub, admin->serializerSvc);
 	}
 	hashMapIterator_destroy(lp_iter);
 	celixThreadMutex_unlock(&admin->localPublicationsLock);
@@ -686,7 +687,7 @@ celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serialize
 	hash_map_iterator_pt subs_iter = hashMapIterator_create(admin->subscriptions);
 	while(hashMapIterator_hasNext(subs_iter)){
 		topic_subscription_pt topic_sub = (topic_subscription_pt) hashMapIterator_nextValue(subs_iter);
-		pubsub_topicSubscriptionAddSerializer(topic_sub, admin->serializerSvc);
+		pubsub_topicSubscriptionSetSerializer(topic_sub, admin->serializerSvc);
 	}
 	hashMapIterator_destroy(subs_iter);
 	celixThreadMutex_unlock(&admin->subscriptionsLock);

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
index 2e95874..28bf56e 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
@@ -74,7 +74,7 @@ struct topic_publication {
 
 typedef struct publish_bundle_bound_service {
 	topic_publication_pt parent;
-	pubsub_publisher_pt service;
+	pubsub_publisher_t pubSvc;
 	bundle_pt bundle;
 	char *topic;
 	pubsub_msg_serializer_map_t* map;
@@ -104,7 +104,7 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsig
 
 static void delay_first_send_for_late_joiners(void);
 
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
 	celix_status_t status = CELIX_SUCCESS;
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
@@ -209,7 +209,7 @@ celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, p
 
 	pub->endpoint = ep;
 	pub->zmq_socket = socket;
-	pub->serializerSvc = serializer;
+	pub->serializerSvc = NULL;
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
 	if (pubEP->is_secure){
@@ -298,9 +298,7 @@ celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
 	celix_status_t status = CELIX_SUCCESS;
 
 	//celixThreadMutex_lock(&(pub->tp_lock));
-
 	status = serviceRegistration_unregister(pub->svcFactoryReg);
-
 	//celixThreadMutex_unlock(&(pub->tp_lock));
 
 	return status;
@@ -331,7 +329,7 @@ celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub
 	return CELIX_SUCCESS;
 }
 
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc){
+celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc){
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&(pub->tp_lock));
@@ -341,19 +339,25 @@ celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pu
 		hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
 		while (hashMapIterator_hasNext(&iter)) {
 			publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter);
-			pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map);
+            celixThreadMutex_lock(&bound->mp_lock);
+            pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map);
+            celixThreadMutex_unlock(&bound->mp_lock);
 			bound->map = NULL;
 		}
 	}
 
 	pub->serializerSvc = serializerSvc;
-	hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
-	while (hashMapIterator_hasNext(&iter)) {
-		hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
-		bundle_pt bundle = hashMapEntry_getKey(entry);
-		publish_bundle_bound_service_t* boundSvc = hashMapEntry_getValue(entry);
-		pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, &boundSvc->map);
-	}
+    if (pub->serializerSvc != NULL) {
+        hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+            bundle_pt bundle = hashMapEntry_getKey(entry);
+            publish_bundle_bound_service_t* bound = hashMapEntry_getValue(entry);
+            celixThreadMutex_lock(&bound->mp_lock);
+            pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, &bound->map);
+            celixThreadMutex_unlock(&bound->mp_lock);
+        }
+    }
 
 	celixThreadMutex_unlock(&(pub->tp_lock));
 
@@ -367,9 +371,11 @@ celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub,
 	if (pub->serializerSvc == svc) {
 		hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
 		while (hashMapIterator_hasNext(&iter)) {
-			publish_bundle_bound_service_t *boundSvc = hashMapIterator_nextValue(&iter);
-			pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, boundSvc->map);
-			boundSvc->map = NULL;
+			publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter);
+            celixThreadMutex_lock(&bound->mp_lock);
+			pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map);
+            celixThreadMutex_unlock(&bound->mp_lock);
+			bound->map = NULL;
 		}
 		pub->serializerSvc = NULL;
 	}
@@ -402,7 +408,7 @@ static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt
 	}
 
 	if(bound!=NULL){
-		*service = bound->service;
+		*service = &bound->pubSvc;
 	}
 
 	celixThreadMutex_unlock(&(publish->tp_lock));
@@ -489,29 +495,41 @@ static bool send_pubsub_mp_msg(zsock_t* zmq_socket, array_list_pt mp_msg_parts){
 }
 
 static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg) {
-
 	return pubsub_topicPublicationSendMultipart(handle,msgTypeId,msg, PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG);
-
 }
 
 static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *msg, int flags){
 	int status = 0;
 	publish_bundle_bound_service_t* bound = handle;
-
 	celixThreadMutex_lock(&(bound->mp_lock));
+
 	if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & PUBSUB_PUBLISHER_LAST_MSG) && bound->mp_send_in_progress){ //means a real mp_msg
 		printf("TP: Multipart send already in progress. Cannot process a new one.\n");
 		celixThreadMutex_unlock(&(bound->mp_lock));
 		return -3;
-	}
+    }
 
 	pubsub_msg_serializer_t* msgSer = NULL;
 	if (bound->map != NULL) {
 		msgSer = hashMap_get(bound->map->serializers, (void*)(uintptr_t)msgTypeId);
 	}
-	int major=0, minor=0;
 
-	if (msgSer != NULL) {
+    if (bound->map == NULL) {
+        printf("TP: Serializer is not set!\n");
+        status = 1;
+    } else if (msgSer == NULL ){
+        printf("TP: No msg serializer available for msg type id %d\n", msgTypeId);
+        hash_map_iterator_t iter = hashMapIterator_construct(bound->map->serializers);
+        printf("Note supported messages:\n");
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_msg_serializer_t *msgSer = hashMapIterator_nextValue(&iter);
+            printf("\tmsg %s with id %d\n", msgSer->msgName, msgSer->msgId);
+        }
+        status = 1;
+    }
+
+	int major=0, minor=0;
+	if (status == 0 && msgSer != NULL) {
 		pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
 		strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
 		msg_hdr->type = msgTypeId;
@@ -579,19 +597,38 @@ static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTy
 		}
 
 	} else {
-		printf("TP: Message %u not supported.",msgTypeId);
+		printf("TP: Message %u not supported.\n",msgTypeId);
 		status=-1;
 	}
 
 	celixThreadMutex_unlock(&(bound->mp_lock));
-
 	return status;
-
 }
 
-static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){
-	*msgTypeId = utils_stringHash(msgType);
-	return 0;
+static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* out){
+	publish_bundle_bound_service_t* bound = handle;
+	unsigned int msgTypeId = 0;
+
+    celixThreadMutex_lock(&bound->mp_lock);
+	if (bound->map != NULL) {
+		hash_map_iterator_t iter = hashMapIterator_construct(bound->map->serializers);
+		while (hashMapIterator_hasNext(&iter)) {
+			pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter);
+			if (strncmp(msgType, msgSer->msgName, 1024*1024) == 0) {
+				msgTypeId = msgSer->msgId;
+				break;
+			}
+		}
+	}
+    celixThreadMutex_unlock(&bound->mp_lock);
+
+	if (msgTypeId != 0) {
+		*out = msgTypeId;
+		return 0;
+	} else {
+		printf("TP: Cannot find msg type id for msg type %s\n", msgType);
+		return 1;
+	}
 }
 
 static unsigned int rand_range(unsigned int min, unsigned int max){
@@ -602,14 +639,11 @@ static unsigned int rand_range(unsigned int min, unsigned int max){
 }
 
 static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){
+	//PRECOND lock on tp->lock
 
 	publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound));
 
 	if (bound != NULL) {
-		bound->service = calloc(1, sizeof(*bound->service));
-	}
-
-	if (bound != NULL && bound->service != NULL) {
 
 		bound->parent = tp;
 		bound->bundle = bundle;
@@ -617,7 +651,6 @@ static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(to
 		bound->mp_send_in_progress = false;
 		celixThreadMutex_create(&bound->mp_lock,NULL);
 
-		//TODO check if lock is needed. e.g. was the caller already locked?
 		if (tp->serializerSvc != NULL) {
 			tp->serializerSvc->createSerializerMap(tp->serializerSvc->handle, bundle, &bound->map);
 		}
@@ -626,17 +659,13 @@ static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(to
 		pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
 		bound->topic=strdup(pubEP->topic);
 
-		bound->service->handle = bound;
-		bound->service->localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID;
-		bound->service->send = pubsub_topicPublicationSend;
-		bound->service->sendMultipart = pubsub_topicPublicationSendMultipart;
-
+		bound->pubSvc.handle = bound;
+		bound->pubSvc.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID;
+		bound->pubSvc.send = pubsub_topicPublicationSend;
+		bound->pubSvc.sendMultipart = pubsub_topicPublicationSendMultipart;
 	}
 	else
 	{
-		if (bound != NULL) {
-			free(bound->service);
-		}
 		free(bound);
 		return NULL;
 	}
@@ -645,13 +674,9 @@ static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(to
 }
 
 static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc){
-
+	//PRECOND lock on publish->tp_lock
 	celixThreadMutex_lock(&boundSvc->mp_lock);
 
-	if (boundSvc->service != NULL) {
-		free(boundSvc->service);
-	}
-
 	if (boundSvc->map != NULL && boundSvc->parent->serializerSvc != NULL) {
 		boundSvc->parent->serializerSvc->destroySerializerMap(boundSvc->parent->serializerSvc->handle, boundSvc->map);
 		boundSvc->map = NULL;

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
index 7ef2c5d..537fbe5 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
@@ -69,7 +69,8 @@ struct topic_subscription {
 	celix_thread_mutex_t ts_lock;
 	bundle_context_pt context;
 
-	hash_map_pt msgSerializers; // key = service ptr, value = pubsub_msg_serializer_map_t*
+	hash_map_pt msgSerializerMapMap; // key = service ptr, value = pubsub_msg_serializer_map_t*
+    hash_map_pt bundleMap; //key = service ptr, value = bundle_pt
 	array_list_pt pendingConnections;
 	array_list_pt pendingDisconnections;
 
@@ -220,7 +221,8 @@ celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,
 	celixThreadMutex_create(&ts->socket_lock, NULL);
 	celixThreadMutex_create(&ts->ts_lock,NULL);
 	arrayList_create(&ts->sub_ep_list);
-	ts->msgSerializers = hashMap_create(NULL, NULL, NULL, NULL);
+	ts->msgSerializerMapMap = hashMap_create(NULL, NULL, NULL, NULL);
+    ts->bundleMap = hashMap_create(NULL, NULL, NULL, NULL);
 	arrayList_create(&ts->pendingConnections);
 	arrayList_create(&ts->pendingDisconnections);
 	celixThreadMutex_create(&ts->pendingConnections_lock, NULL);
@@ -266,7 +268,8 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
 	serviceTracker_destroy(ts->tracker);
 	arrayList_clear(ts->sub_ep_list);
 	arrayList_destroy(ts->sub_ep_list);
-	hashMap_destroy(ts->msgSerializers,false,false);
+	hashMap_destroy(ts->msgSerializerMapMap,false,false);
+    hashMap_destroy(ts->bundleMap,false,false);
 
 	celixThreadMutex_lock(&ts->pendingConnections_lock);
 	arrayList_destroy(ts->pendingConnections);
@@ -426,13 +429,34 @@ unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
 	return ts->nrSubscribers;
 }
 
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){
+celix_status_t pubsub_topicSubscriptionSetserializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-
-	ts->serializerSvc = serializerSvc;
-
+    //clear old
+    if (ts->serializerSvc != NULL) {
+        hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+            pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry);
+            pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
+            ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
+            hashMap_put(ts->msgSerializerMapMap, subsvc, NULL);
+
+        }
+    }
+    ts->serializerSvc = serializerSvc;
+    //init new
+    if (ts->serializerSvc != NULL) {
+        hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_subscriber_t* subsvc = hashMapIterator_nextKey(&iter);
+            bundle_pt bundle = hashMap_get(ts->bundleMap, subsvc);
+            pubsub_msg_serializer_map_t* map = NULL;
+            ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map);
+            hashMap_put(ts->msgSerializerMapMap, subsvc, map);
+        }
+    }
 	celixThreadMutex_unlock(&ts->ts_lock);
 
 	return status;
@@ -442,53 +466,59 @@ celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-	if (ts->serializerSvc == svc) {
-		hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializers);
-		while(hashMapIterator_hasNext(&iter)){
-			pubsub_msg_serializer_map_t* map = hashMapIterator_nextValue(&iter);
-			ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
-		}
-	}
-	ts->serializerSvc = NULL;
+    if (ts->serializerSvc == svc) { //only act if svc removed is services used
+        hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+            pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry);
+            pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
+            ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
+            hashMap_put(ts->msgSerializerMapMap, subsvc, NULL);
+        }
+        ts->serializerSvc = NULL;
+    }
 	celixThreadMutex_unlock(&ts->ts_lock);
 
 	return status;
 }
 
-static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service){
+static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void* svc) {
 	celix_status_t status = CELIX_SUCCESS;
 	topic_subscription_pt ts = handle;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-	if (!hashMap_containsKey(ts->msgSerializers, service)) {
-		bundle_pt bundle = NULL;
-		serviceReference_getBundle(reference, &bundle);
-
-		if (ts->serializerSvc != NULL) {
-			pubsub_msg_serializer_map_t* map = NULL;
-			ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map);
-			if (map != NULL) {
-				hashMap_put(ts->msgSerializers, service, map);
-			} else {
-				printf("TS: Cannot create msg serializer map\n");
-			}
-		}
-	}
+    if (!hashMap_containsKey(ts->msgSerializerMapMap, svc)) {
+        bundle_pt bundle = NULL;
+        serviceReference_getBundle(reference, &bundle);
+
+        if (ts->serializerSvc != NULL) {
+            pubsub_msg_serializer_map_t* map = NULL;
+            ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map);
+            if (map != NULL) {
+                hashMap_put(ts->msgSerializerMapMap, svc, map);
+                hashMap_put(ts->bundleMap, svc, bundle);
+            }
+        }
+    }
 	celixThreadMutex_unlock(&ts->ts_lock);
 	printf("TS: New subscriber registered.\n");
 	return status;
 
 }
 
-static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service){
+static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void* svc) {
 	celix_status_t status = CELIX_SUCCESS;
 	topic_subscription_pt ts = handle;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-	if (hashMap_containsKey(ts->msgSerializers, service)) {
-		pubsub_msg_serializer_map_t* map = hashMap_remove(ts->msgSerializers, service);
-        ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
-	}
+    if (hashMap_containsKey(ts->msgSerializerMapMap, svc)) {
+        pubsub_msg_serializer_map_t* map = hashMap_remove(ts->msgSerializerMapMap, svc);
+        if (ts->serializerSvc != NULL){
+            ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
+            hashMap_remove(ts->bundleMap, svc);
+            hashMap_remove(ts->msgSerializerMapMap, svc);
+        }
+    }
 	celixThreadMutex_unlock(&ts->ts_lock);
 
 	printf("TS: Subscriber unregistered.\n");
@@ -500,7 +530,7 @@ static void process_msg(topic_subscription_pt sub, array_list_pt msg_list) {
 
 	pubsub_msg_header_pt first_msg_hdr = (pubsub_msg_header_pt)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->header);
 
-	hash_map_iterator_t iter = hashMapIterator_construct(sub->msgSerializers);
+	hash_map_iterator_t iter = hashMapIterator_construct(sub->msgSerializerMapMap);
 	while (hashMapIterator_hasNext(&iter)) {
 		hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
 		pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry);

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c b/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
index 2dd8258..6145a38 100644
--- a/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
+++ b/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
@@ -250,14 +250,17 @@ static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle
                     bool clash = hashMap_containsKey(msgSerializers, (void*)(uintptr_t)msgId);
                     if (clash) {
                         printf("Cannot add msg %s. clash in msg id %d!!\n", msgName, msgId);
+                        free(impl);
+                        dynMessage_destroy(msgType);
                     } else if ( msgName != NULL && msgVersion != NULL && msgId != 0) {
                         hashMap_put(msgSerializers, (void*)(uintptr_t)msgId, &impl->msgSerializer);
                     } else {
-                        printf("Error adding creating msg serializer\n");
+                        printf("Error creating msg serializer\n");
+                        free(impl);
+                        dynMessage_destroy(msgType);
                     }
 
-                }
-                else{
+                } else{
                     printf("DMU: cannot parse message from descriptor %s\n.",path);
                 }
                 fclose(stream);

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
index 36ea422..6047cf8 100644
--- a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
+++ b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
@@ -226,10 +226,20 @@ celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_
 
 	celixThreadMutex_unlock(&manager->publicationsLock);
 
+
+	celixThreadMutex_lock(&manager->serializerListLock);
+	unsigned int size = arrayList_size(manager->serializerList);
+	if (size > 0) {
+		pubsub_serializer_service_t* ser = arrayList_get(manager->serializerList, (size-1)); //last, same as result of add/remove serializer
+		new_psa->setSerializer(new_psa->admin, ser);
+	}
+	celixThreadMutex_unlock(&manager->serializerListLock);
+
 	celixThreadMutex_lock(&manager->psaListLock);
 	arrayList_add(manager->psaList, new_psa);
 	celixThreadMutex_unlock(&manager->psaListLock);
 
+
 	return status;
 }
 
@@ -335,7 +345,6 @@ celix_status_t pubsub_topologyManager_pubsubSerializerAdded(void* handle, servic
 	logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added pubsub serializer");
 
 	int i;
-
 	for(i=0; i<arrayList_size(manager->psaList); i++){
 		pubsub_admin_service_pt psa = (pubsub_admin_service_pt) arrayList_get(manager->psaList,i);
 		psa->setSerializer(psa->admin, new_serializer);

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/test/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/test/CMakeLists.txt b/pubsub/test/CMakeLists.txt
index 7cd0003..3b1655b 100644
--- a/pubsub/test/CMakeLists.txt
+++ b/pubsub/test/CMakeLists.txt
@@ -38,11 +38,11 @@ bundle_files(pubsub_sut
 add_deploy(pubsub_udpmc_sut
     NAME deploy_sut
     BUNDLES
+        org.apache.celix.pubsub_serializer.PubSubSerializerJson
         org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+        #org.apache.celix.pubsub_admin.PubSubAdminUdpMc
+        org.apache.celix.pubsub_admin.PubSubAdminZmq
         org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
-        org.apache.celix.pubsub_admin.PubSubAdminUdpMc
-        #org.apache.celix.pubsub_admin.PubSubAdminZmq
-        org.apache.celix.pubsub_serializer.PubSubSerializerJson
         pubsub_sut
     DIR ${PROJECT_BINARY_DIR}/runtimes/test/pubsub/udpmc
 )
@@ -60,11 +60,11 @@ bundle_files(pubsub_tst
 add_deploy(pubsub_udpmc_tst
     NAME deploy_tst
     BUNDLES
-        org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
-        org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
-        org.apache.celix.pubsub_admin.PubSubAdminUdpMc
-        #org.apache.celix.pubsub_admin.PubSubAdminZmq
         org.apache.celix.pubsub_serializer.PubSubSerializerJson
+        org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
+        org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+        #org.apache.celix.pubsub_admin.PubSubAdminUdpMc
+        org.apache.celix.pubsub_admin.PubSubAdminZmq
         pubsub_tst
     DIR ${PROJECT_BINARY_DIR}/runtimes/test/pubsub/udpmc
     LAUNCHER celix_test_runner

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/test/msg_descriptors/msg.descriptor
----------------------------------------------------------------------
diff --git a/pubsub/test/msg_descriptors/msg.descriptor b/pubsub/test/msg_descriptors/msg.descriptor
index 03b15ba..0eb28cb 100644
--- a/pubsub/test/msg_descriptors/msg.descriptor
+++ b/pubsub/test/msg_descriptors/msg.descriptor
@@ -3,6 +3,7 @@ type=message
 name=msg
 version=1.0.0
 :annotations
+classname=org.example.Msg
 :types
 :message
-{n seqnR}
+{i seqnR}

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/test/test/msg.h
----------------------------------------------------------------------
diff --git a/pubsub/test/test/msg.h b/pubsub/test/test/msg.h
index babfd1f..49169c5 100644
--- a/pubsub/test/test/msg.h
+++ b/pubsub/test/test/msg.h
@@ -20,8 +20,10 @@
 #ifndef MSG_H
 #define MSG_H
 
+#include <stdint.h>
+
 typedef struct msg {
-    int seqNr;
+    uint32_t seqNr;
 } msg_t;
 
 #endif //MSG_H

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/test/test/sut_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/test/test/sut_activator.c b/pubsub/test/test/sut_activator.c
index 3e3b33b..6f02e79 100644
--- a/pubsub/test/test/sut_activator.c
+++ b/pubsub/test/test/sut_activator.c
@@ -19,6 +19,7 @@
 
 #include <stdio.h>
 #include <stdlib.h>
+#include <constants.h>
 
 #include "bundle_activator.h"
 #include "service_tracker.h"
@@ -57,7 +58,9 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt context)
 	act->reg = NULL;
 	bundleContext_registerService(context, PUBSUB_SUBSCRIBER_SERVICE_NAME, &act->subSvc, props, &act->reg);
 
-	const char* filter = "(&(objectClass=pubsub.publisher)(pubsub.topic=pong))";
+	char filter[512];
+	snprintf(filter, 512, "(&(%s=%s)(%s=%s))", OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_PUBLISHER_SERVICE_NAME, PUBSUB_PUBLISHER_TOPIC, "pong");
+
 	service_tracker_customizer_pt customizer = NULL;
 	serviceTrackerCustomizer_create(act, NULL, sut_pubAdded, NULL, sut_pubRemoved, &customizer);
 	serviceTracker_createWithFilter(context, filter, customizer, &act->tracker);

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/test/test/tst_activator.cpp
----------------------------------------------------------------------
diff --git a/pubsub/test/test/tst_activator.cpp b/pubsub/test/test/tst_activator.cpp
index 266f73e..2cf5309 100644
--- a/pubsub/test/test/tst_activator.cpp
+++ b/pubsub/test/test/tst_activator.cpp
@@ -32,7 +32,7 @@
 
 #include <CppUTest/TestHarness.h>
 #include <CppUTestExt/MockSupport.h>
-
+#include <constants.h>
 
 
 static int tst_receive(void *handle, const char *msgType, unsigned int msgTypeId, void *msg, pubsub_multipart_callbacks_t *callbacks, bool *release);
@@ -70,8 +70,10 @@ celix_status_t bundleActivator_start(__attribute__((unused)) void * userData, bu
 	g_act.subSvc.receive = tst_receive;
 	bundleContext_registerService(context, PUBSUB_SUBSCRIBER_SERVICE_NAME, &g_act.subSvc, props, &g_act.reg);
 
-    const char* filter = "(&(objectClass=pubsub.publisher)(pubsub.topic=ping))";
-	service_tracker_customizer_pt customizer = NULL;
+    char filter[512];
+    snprintf(filter, 512, "(&(%s=%s)(%s=%s))", OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_PUBLISHER_SERVICE_NAME, PUBSUB_PUBLISHER_TOPIC, "ping");
+
+    service_tracker_customizer_pt customizer = NULL;
 	serviceTrackerCustomizer_create(&g_act, NULL, tst_pubAdded, NULL, tst_pubRemoved, &customizer);
 	serviceTracker_createWithFilter(context, filter, customizer, &g_act.tracker);
 	serviceTracker_open(g_act.tracker);
@@ -157,7 +159,7 @@ TEST_GROUP(PUBSUB_INT_GROUP)
             if (count > 0) {
                 break;
             } else {
-                printf("No return message received, waiting for a while\n");
+                printf("No return message received, waiting for a while. %d/%d\n", i+1, TRIES);
             }
         }
         CHECK(count > 0);
@@ -176,7 +178,8 @@ TEST(PUBSUB_INT_GROUP, sendRecvTest) {
     constexpr int COUNT = 50;
     msg_t msg;
     for (int i = 0; i < COUNT; ++i) {
-        msg.seqNr = i;
+        msg.seqNr = i+1;
+        printf("Sending test msg %d of %d\n", i+1, COUNT);
         pthread_mutex_lock(&g_act.mutex);
         g_act.pubSvc->send(g_act.pubSvc->handle, g_act.msgId, &msg);
         pthread_mutex_unlock(&g_act.mutex);


Mime
View raw message