celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnol...@apache.org
Subject [3/8] celix git commit: CELIX-408: Major refactoring of the pubsub serializer design and usage. example of udpmc and zmq seesm to be working. combi and zmq multicast are not stable yet.
Date Fri, 14 Apr 2017 08:14:04 GMT
CELIX-408: Major refactoring of the pubsub serializer design and usage. example of udpmc and zmq seesm to be working. combi and zmq multicast are not stable yet.

- The serializer is refactored to miminize to needed callback directly to the services. The serializer is now a two step approach.
  The serializer service can be used to create a map os msg serializers.
  And the msg serializer are serializer structs (with function ptrs) to serialize a specific msg
- Where feasible replace _pt types with _t types. Removing the pointer in the typedef makes it possible to add const info and sizeof calls of the typedef


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/7efe4331
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/7efe4331
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/7efe4331

Branch: refs/heads/develop
Commit: 7efe4331aafdfea1df8bd181c136f20acaf52b49
Parents: 97df926
Author: Pepijn Noltes <pepijnnoltes@gmail.com>
Authored: Tue Apr 11 21:19:10 2017 +0200
Committer: Pepijn Noltes <pepijnnoltes@gmail.com>
Committed: Tue Apr 11 21:19:10 2017 +0200

----------------------------------------------------------------------
 cmake/cmake_celix/BundlePackaging.cmake         |   2 +-
 dfi/private/src/json_serializer.c               |   8 +-
 dfi/public/include/json_serializer.h            |   4 +-
 pubsub/pubsub_admin_udp_mc/CMakeLists.txt       |   1 -
 .../private/include/pubsub_admin_impl.h         |   6 +-
 .../include/pubsub_publish_service_private.h    |   8 +-
 .../private/include/topic_subscription.h        |   6 +-
 .../private/src/pubsub_admin_impl.c             |   4 +-
 .../private/src/topic_publication.c             | 173 +++++++-------
 .../private/src/topic_subscription.c            | 158 +++++++------
 pubsub/pubsub_admin_zmq/CMakeLists.txt          |   1 -
 .../private/include/pubsub_admin_impl.h         |   6 +-
 .../include/pubsub_publish_service_private.h    |   6 +-
 .../private/include/topic_subscription.h        |   6 +-
 .../private/src/pubsub_admin_impl.c             |   4 +-
 .../private/src/topic_publication.c             | 137 ++++++-----
 .../private/src/topic_subscription.c            | 184 ++++++---------
 .../public/include/dyn_msg_utils.h              |  39 ----
 .../pubsub_common/public/include/pubsub_admin.h |   4 +-
 .../public/include/pubsub_serializer.h          |  47 ++--
 pubsub/pubsub_common/public/src/dyn_msg_utils.c | 160 -------------
 pubsub/pubsub_serializer_json/CMakeLists.txt    |   1 -
 .../private/include/pubsub_serializer_impl.h    |  35 ++-
 .../private/src/ps_activator.c                  |  20 +-
 .../private/src/pubsub_serializer_impl.c        | 234 +++++++++++++++----
 .../private/src/pubsub_topology_manager.c       |   6 +-
 pubsub/test/msg_descriptors/msg.descriptor      |   2 +-
 pubsub/test/test/tst_activator.cpp              |   9 +-
 28 files changed, 582 insertions(+), 689 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/cmake/cmake_celix/BundlePackaging.cmake
----------------------------------------------------------------------
diff --git a/cmake/cmake_celix/BundlePackaging.cmake b/cmake/cmake_celix/BundlePackaging.cmake
index ded1ca5..7eb42fa 100644
--- a/cmake/cmake_celix/BundlePackaging.cmake
+++ b/cmake/cmake_celix/BundlePackaging.cmake
@@ -319,7 +319,7 @@ function(bundle_libs)
             if (ADD_TO_MANIFEST)
                 list(APPEND LIBS "$<TARGET_SONAME_FILE_NAME:${LIB}>")
             endif()
-                list(APPEND DEPS "${OUT}") #NOTE depending on ${OUT} not on $<TARGET_FILE:${LIB}>.
+            list(APPEND DEPS "${OUT}") #NOTE depending on ${OUT} not on $<TARGET_FILE:${LIB}>.
         endif()
 
         get_target_property(IS_LIB ${BUNDLE} "BUNDLE_TARGET_IS_LIB")

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/dfi/private/src/json_serializer.c
----------------------------------------------------------------------
diff --git a/dfi/private/src/json_serializer.c b/dfi/private/src/json_serializer.c
index 0c06998..c1cd339 100644
--- a/dfi/private/src/json_serializer.c
+++ b/dfi/private/src/json_serializer.c
@@ -280,7 +280,7 @@ static int jsonSerializer_parseSequence(dyn_type *seq, json_t *array, void *seqL
     return status;
 }
 
-int jsonSerializer_serialize(dyn_type *type, void *input, char **output) {
+int jsonSerializer_serialize(dyn_type *type, const void* input, char **output) {
     int status = OK;
 
     json_t *root = NULL;
@@ -294,11 +294,11 @@ int jsonSerializer_serialize(dyn_type *type, void *input, char **output) {
     return status;
 }
 
-int jsonSerializer_serializeJson(dyn_type *type, void *input, json_t **out) {
-    return jsonSerializer_writeAny(type, input, out);
+int jsonSerializer_serializeJson(dyn_type *type, const void* input, json_t **out) {
+    return jsonSerializer_writeAny(type, (void*)input /*TODO update static function to take const void**/, out);
 }
 
-static int jsonSerializer_writeAny(dyn_type *type, void *input, json_t **out) {
+static int jsonSerializer_writeAny(dyn_type *type, void* input, json_t **out) {
     int status = OK;
 
     int descriptor = dynType_descriptorType(type);

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/dfi/public/include/json_serializer.h
----------------------------------------------------------------------
diff --git a/dfi/public/include/json_serializer.h b/dfi/public/include/json_serializer.h
index c785b01..2f91f2b 100644
--- a/dfi/public/include/json_serializer.h
+++ b/dfi/public/include/json_serializer.h
@@ -31,7 +31,7 @@ DFI_SETUP_LOG_HEADER(jsonSerializer);
 int jsonSerializer_deserialize(dyn_type *type, const char *input, void **result);
 int jsonSerializer_deserializeJson(dyn_type *type, json_t *input, void **result);
 
-int jsonSerializer_serialize(dyn_type *type, void *input, char **output);
-int jsonSerializer_serializeJson(dyn_type *type, void *input, json_t **out);
+int jsonSerializer_serialize(dyn_type *type, const void* input, char **output);
+int jsonSerializer_serializeJson(dyn_type *type, const void* input, json_t **out);
 
 #endif

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/CMakeLists.txt b/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
index ce32db0..1ac0c2d 100644
--- a/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
+++ b/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
@@ -35,7 +35,6 @@ add_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc
     	private/src/topic_subscription.c
     	private/src/topic_publication.c
     	private/src/large_udp.c
-    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/dyn_msg_utils.c
     	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
     	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
 )

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
index 9eddf15..89e6547 100644
--- a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
+++ b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
@@ -33,7 +33,7 @@
 
 struct pubsub_admin {
 
-	pubsub_serializer_service_pt serializerSvc;
+	pubsub_serializer_service_t* serializerSvc;
 
 	bundle_context_pt bundle_context;
 	log_helper_pt loghelper;
@@ -73,7 +73,7 @@ celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* sco
 celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score);
 celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score);
 
-celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
-celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc);
 
 #endif /* PUBSUB_ADMIN_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
index 81690b8..57d942a 100644
--- a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
+++ b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
@@ -39,17 +39,17 @@ typedef struct pubsub_udp_msg {
     struct pubsub_msg_header header;
     unsigned int payloadSize;
     char payload[];
-} *pubsub_udp_msg_pt;
+} pubsub_udp_msg_t;
 
 typedef struct topic_publication *topic_publication_pt;
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* bindIP, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, topic_publication_pt *out);
 celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
 
 celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
 celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
 
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc);
-celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
 
 celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
 celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h b/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
index 36c902e..1535ae5 100644
--- a/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
+++ b/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
@@ -38,7 +38,7 @@
 
 typedef struct topic_subscription* topic_subscription_pt;
 
-celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, pubsub_serializer_service_pt serializer, char* scope, char* topic,topic_subscription_pt* out);
+celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, pubsub_serializer_service_t* serializer, char* scope, char* topic,topic_subscription_pt* out);
 celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
 celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
 celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);
@@ -49,8 +49,8 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
 celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
 celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
 
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc);
-celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc);
 
 celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription);
 celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt subscription);

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
index 6e9f4e8..ebfe3e6 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
@@ -645,7 +645,7 @@ celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoin
 	return status;
 }
 
-celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc){
 	celix_status_t status = CELIX_SUCCESS;
 	admin->serializerSvc = serializerSvc;
 
@@ -673,7 +673,7 @@ celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serialize
 	return status;
 }
 
-celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc){
 	celix_status_t status = CELIX_SUCCESS;
 	admin->serializerSvc = NULL;
 

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
index aa3faf0..be0a433 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
@@ -37,7 +37,6 @@
 #include "array_list.h"
 #include "celixbool.h"
 #include "service_registration.h"
-#include "dyn_msg_utils.h"
 #include "utils.h"
 #include "service_factory.h"
 #include "version.h"
@@ -59,7 +58,7 @@ struct topic_publication {
 	hash_map_pt boundServices; //<bundle_pt,bound_service>
 	celix_thread_mutex_t tp_lock;
 	struct sockaddr_in destAddr;
-	pubsub_serializer_service_pt serializerSvc;
+	pubsub_serializer_service_t* serializerSvc;
 };
 
 typedef struct publish_bundle_bound_service {
@@ -68,27 +67,27 @@ typedef struct publish_bundle_bound_service {
 	bundle_pt bundle;
     char *scope;
 	char *topic;
-	hash_map_pt msgTypes;
 	unsigned short getCount;
 	celix_thread_mutex_t mp_lock;
 	bool mp_send_in_progress;
 	array_list_pt mp_parts;
 	largeUdp_pt largeUdpHandle;
-}* publish_bundle_bound_service_pt;
+	pubsub_msg_serializer_map_t* map;
+} publish_bundle_bound_service_t;
 
 typedef struct pubsub_msg{
 	pubsub_msg_header_pt header;
 	char* payload;
 	int payloadSize;
-}* pubsub_msg_pt;
+} pubsub_msg_t;
 
 static unsigned int rand_range(unsigned int min, unsigned int max);
 
 static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
 static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
 
-static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle);
-static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc);
+static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle);
+static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc);
 
 static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg);
 
@@ -98,7 +97,7 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsig
 static void delay_first_send_for_late_joiners(void);
 
 
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* bindIP, topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, topic_publication_pt *out){
 
     char* ep = malloc(EP_ADDRESS_LEN);
     memset(ep,0,EP_ADDRESS_LEN);
@@ -136,7 +135,7 @@ celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
 
 	hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
 	while(hashMapIterator_hasNext(iter)){
-		publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter);
+		publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(iter);
 		pubsub_destroyPublishBundleBoundService(bound);
 	}
 	hashMapIterator_destroy(iter);
@@ -223,41 +222,49 @@ celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub
 	return CELIX_SUCCESS;
 }
 
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc){
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&(pub->tp_lock));
 
-	pub->serializerSvc = serializerSvc;
+    //clear old serializer
+    if (pub->serializerSvc != NULL) {
+        hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); //key = bundle , value = svc
+        while (hashMapIterator_hasNext(&iter)) {
+            publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter);
+			pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map);
+			bound->map = NULL;
+        }
+    }
 
-	hash_map_iterator_pt bs_iter = hashMapIterator_create(pub->boundServices);
-	while(hashMapIterator_hasNext(bs_iter)){
-		publish_bundle_bound_service_pt boundSvc = (publish_bundle_bound_service_pt) hashMapIterator_nextValue(bs_iter);
-		if (hashMap_size(boundSvc->msgTypes) == 0){
-			pub->serializerSvc->fillMsgTypesMap(pub->serializerSvc->serializer, boundSvc->msgTypes, boundSvc->bundle);
-		}
+    //setup new serializer
+	pub->serializerSvc = serializerSvc;
+	hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
+	while (hashMapIterator_hasNext(&iter)) {
+		hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+		bundle_pt bundle = hashMapEntry_getKey(entry);
+		publish_bundle_bound_service_t* bound = hashMapEntry_getValue(entry);
+		pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, &bound->map);
 	}
-	hashMapIterator_destroy(bs_iter);
 
 	celixThreadMutex_unlock(&(pub->tp_lock));
 
 	return status;
 }
 
-celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* svc){
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&(pub->tp_lock));
-
-	hash_map_iterator_pt bs_iter = hashMapIterator_create(pub->boundServices);
-	while(hashMapIterator_hasNext(bs_iter)){
-		publish_bundle_bound_service_pt boundSvc = (publish_bundle_bound_service_pt) hashMapIterator_nextValue(bs_iter);
-		pub->serializerSvc->emptyMsgTypesMap(pub->serializerSvc->serializer, boundSvc->msgTypes);
-	}
-	hashMapIterator_destroy(bs_iter);
-
+    if (pub->serializerSvc == svc) {
+        hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
+        while (hashMapIterator_hasNext(&iter)) {
+            publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter);
+            pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map);
+			bound->map = NULL;
+        }
+    }
 	pub->serializerSvc = NULL;
-
 	celixThreadMutex_unlock(&(pub->tp_lock));
 
 	return status;
@@ -275,18 +282,18 @@ static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt
 
 	celixThreadMutex_lock(&(publish->tp_lock));
 
-	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
-	if(bound==NULL){
-		bound = pubsub_createPublishBundleBoundService(publish,bundle);
-		if(bound!=NULL){
-			hashMap_put(publish->boundServices,bundle,bound);
+	publish_bundle_bound_service_t* bound = hashMap_get(publish->boundServices, bundle);
+	if (bound == NULL) {
+		bound = pubsub_createPublishBundleBoundService(publish, bundle);
+		if (bound != NULL) {
+			hashMap_put(publish->boundServices, bundle, bound);
 		}
 	}
-	else{
+	else {
 		bound->getCount++;
 	}
 
-	if(bound!=NULL){
+	if (bound != NULL) {
 		*service = bound->service;
 	}
 
@@ -301,17 +308,16 @@ static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_p
 
 	celixThreadMutex_lock(&(publish->tp_lock));
 
-	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
-	if(bound!=NULL){
+	publish_bundle_bound_service_t* bound = hashMap_get(publish->boundServices, bundle);
+	if (bound != NULL) {
 
 		bound->getCount--;
-		if(bound->getCount==0){
+		if (bound->getCount == 0) {
 			pubsub_destroyPublishBundleBoundService(bound);
 			hashMap_remove(publish->boundServices,bundle);
 		}
-
 	}
-	else{
+	else {
 		long bundleId = -1;
 		bundle_getBundleId(bundle,&bundleId);
 		printf("TP: Unexpected ungetService call for bundle %ld.\n", bundleId);
@@ -325,12 +331,11 @@ static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_p
 	return CELIX_SUCCESS;
 }
 
-static bool send_pubsub_msg(publish_bundle_bound_service_pt bound, pubsub_msg_pt msg, bool last, pubsub_release_callback_t *releaseCallback){
+static bool send_pubsub_msg(publish_bundle_bound_service_t* bound, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback){
 	const int iovec_len = 3; // header + size + payload
 	bool ret = true;
-	pubsub_udp_msg_pt udpMsg;
 
-	int compiledMsgSize = sizeof(*udpMsg) + msg->payloadSize;
+	int compiledMsgSize = sizeof(pubsub_udp_msg_t) + msg->payloadSize;
 
 	struct iovec msg_iovec[iovec_len];
 	msg_iovec[0].iov_base = msg->header;
@@ -348,51 +353,51 @@ static bool send_pubsub_msg(publish_bundle_bound_service_pt bound, pubsub_msg_pt
 	    ret = false;
 	}
 
-	//free(udpMsg);
 	if(releaseCallback) {
 	    releaseCallback->release(msg->payload, bound);
 	}
 	return ret;
-
 }
 
 
 static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg) {
     int status = 0;
-    publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle;
+    publish_bundle_bound_service_t* bound = handle;
 
     celixThreadMutex_lock(&(bound->parent->tp_lock));
     celixThreadMutex_lock(&(bound->mp_lock));
 
-    //TODO //FIXME -> should use pointer to int as identifier, can be many pointers to int ....
-    printf("TODO FIX usage of msg id's in the serializer hashmap. This seems wrongly based on pointers to uints!!!!\n");
-    pubsub_message_type *msgType = hashMap_get(bound->msgTypes, &msgTypeId);
-
-    int major=0, minor=0;
+    pubsub_msg_serializer_t *msgSer = NULL;
+    if (bound->map != NULL) {
+        msgSer = hashMap_get(bound->map->serializers, (void *)(uintptr_t)msgTypeId);
+    }
 
-    if (msgType != NULL && bound->parent->serializerSvc != NULL) {
+    if (bound->map == NULL) {
+        printf("TP: Serializer is not set!\n");
+    } else if (msgSer == NULL ){
+        printf("TP: No msg serializer available for msg type id %d\n", msgTypeId);
+    }
 
-    	version_pt msgVersion = bound->parent->serializerSvc->getVersion(bound->parent->serializerSvc->serializer, msgType);
+    int major=0, minor=0;
 
+    if (msgSer != NULL) {
 		pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
-
 		strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
-
 		msg_hdr->type = msgTypeId;
-		if (msgVersion != NULL){
-			version_getMajor(msgVersion, &major);
-			version_getMinor(msgVersion, &minor);
+		if (msgSer->msgVersion != NULL){
+			version_getMajor(msgSer->msgVersion, &major);
+			version_getMinor(msgSer->msgVersion, &minor);
 			msg_hdr->major = major;
 			msg_hdr->minor = minor;
 		}
 
-		void* serializedOutput = NULL;
-		int serializedOutputLen = 0;
-		bound->parent->serializerSvc->serialize(bound->parent->serializerSvc->serializer, msgType, msg, &serializedOutput, &serializedOutputLen);
+		char* serializedOutput = NULL;
+		size_t serializedOutputLen = 0;
+        msgSer->serialize(msgSer->handle, msg, &serializedOutput, &serializedOutputLen);
 
-		pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg));
+		pubsub_msg_t* msg = calloc(1,sizeof(struct pubsub_msg));
 		msg->header = msg_hdr;
-		msg->payload = (char *) serializedOutput;
+		msg->payload = serializedOutput;
 		msg->payloadSize = serializedOutputLen;
 
 		if(send_pubsub_msg(bound, msg,true, NULL) == false) {
@@ -403,11 +408,7 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, con
 		free(serializedOutput);
 
     } else {
-		if (bound->parent->serializerSvc == NULL) {
-			printf("TP: Serializer is not set!\n");
-		} else {
-			printf("TP: Message %u not supported.\n",msgTypeId);
-		}
+        printf("TP: Message %u not supported.\n",msgTypeId);
         status=-1;
     }
 
@@ -430,9 +431,9 @@ static unsigned int rand_range(unsigned int min, unsigned int max){
 
 }
 
-static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){
+static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle) {
 
-	publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
+	publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound));
 
 	if (bound != NULL) {
 		bound->service = calloc(1, sizeof(*bound->service));
@@ -445,7 +446,6 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
 		bound->getCount = 1;
 		bound->mp_send_in_progress = false;
 		celixThreadMutex_create(&bound->mp_lock,NULL);
-		bound->msgTypes = hashMap_create(uintHash, NULL, uintEquals, NULL); //<int* (msgId),pubsub_message_type>
 		arrayList_create(&bound->mp_parts);
 
 		pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
@@ -457,10 +457,10 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
 		bound->service->send = pubsub_topicPublicationSend;
 		bound->service->sendMultipart = NULL;  //Multipart not supported (jet) for UDP
 
-		if (tp->serializerSvc != NULL){
-			tp->serializerSvc->fillMsgTypesMap(tp->serializerSvc->serializer, bound->msgTypes,bound->bundle);
+        //TODO check if lock on tp is needed? (e.g. is lock already done by caller?)
+		if (tp->serializerSvc != NULL) {
+            tp->serializerSvc->createSerializerMap(tp->serializerSvc->handle, bundle, &bound->map);
 		}
-
 	}
 	else
 	{
@@ -474,30 +474,33 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
 	return bound;
 }
 
-static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){
+static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc) {
 
 	celixThreadMutex_lock(&boundSvc->mp_lock);
 
-	if(boundSvc->service != NULL){
+	if (boundSvc->service != NULL) {
 		free(boundSvc->service);
 	}
 
-	if(boundSvc->msgTypes != NULL){
-		if (boundSvc->parent->serializerSvc != NULL){
-			boundSvc->parent->serializerSvc->emptyMsgTypesMap(boundSvc->parent->serializerSvc->serializer, boundSvc->msgTypes);
-		}
-		hashMap_destroy(boundSvc->msgTypes,false,false);
-	}
+    //TODO check if lock on parent is needed, e.g. does the caller already lock?
+    if (boundSvc->map != NULL) {
+        if (boundSvc->parent->serializerSvc == NULL) {
+            printf("TP: Cannot destroy pubsub msg serializer map. No serliazer service\n");
+        } else {
+            boundSvc->parent->serializerSvc->destroySerializerMap(boundSvc->parent->serializerSvc->handle, boundSvc->map);
+            boundSvc->map = NULL;
+        }
+    }
 
-	if(boundSvc->mp_parts!=NULL){
+	if (boundSvc->mp_parts!=NULL) {
 		arrayList_destroy(boundSvc->mp_parts);
 	}
 
-    if(boundSvc->scope!=NULL){
+    if (boundSvc->scope!=NULL) {
         free(boundSvc->scope);
     }
 
-    if(boundSvc->topic!=NULL){
+    if (boundSvc->topic!=NULL) {
 		free(boundSvc->topic);
 	}
 

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
index 91bce9f..da23b21 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
@@ -49,7 +49,6 @@
 #include "topic_subscription.h"
 #include "subscriber.h"
 #include "publisher.h"
-#include "dyn_msg_utils.h"
 #include "pubsub_publish_service_private.h"
 #include "large_udp.h"
 
@@ -68,11 +67,15 @@ struct topic_subscription{
 	celix_thread_mutex_t ts_lock;
 	bundle_context_pt context;
 	int topicEpollFd; // EPOLL filedescriptor where the sockets are registered.
-	hash_map_pt servicesMap; // key = service, value = msg types map
+
+    //NOTE. using a service ptr can be dangerous, because pointer can be reused.
+    //ensuring that pointer are removed before new (refurbish) pionter comes along is crucial!
+	hash_map_pt msgSerializersMap; // key = service ptr, value = pubsub_msg_serializer_map_t*
+
 	hash_map_pt socketMap; // key = URL, value = listen-socket
 	unsigned int nrSubscribers;
 	largeUdp_pt largeUdpHandle;
-	pubsub_serializer_service_pt serializerSvc;
+	pubsub_serializer_service_t* serializerSvc;
 
 };
 
@@ -94,7 +97,7 @@ static void sigusr1_sighandler(int signo);
 static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
 
 
-celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, pubsub_serializer_service_pt serializer, char* scope, char* topic,topic_subscription_pt* out){
+celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, pubsub_serializer_service_t* serializer, char* scope, char* topic,topic_subscription_pt* out){
 	celix_status_t status = CELIX_SUCCESS;
 
 	topic_subscription_pt ts = (topic_subscription_pt) calloc(1,sizeof(*ts));
@@ -115,7 +118,7 @@ celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundl
 
 	celixThreadMutex_create(&ts->ts_lock,NULL);
 	arrayList_create(&ts->sub_ep_list);
-	ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL);
+	ts->msgSerializersMap = hashMap_create(NULL, NULL, NULL, NULL);
 	ts->socketMap =  hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
 	ts->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS);
@@ -161,7 +164,7 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
 	serviceTracker_destroy(ts->tracker);
 	arrayList_clear(ts->sub_ep_list);
 	arrayList_destroy(ts->sub_ep_list);
-	hashMap_destroy(ts->servicesMap,false,false);
+	hashMap_destroy(ts->msgSerializersMap,false,false);
 
 	hashMap_destroy(ts->socketMap,false,false);
 	largeUdp_destroy(ts->largeUdpHandle);
@@ -391,7 +394,7 @@ unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
 	return ts->nrSubscribers;
 }
 
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&ts->ts_lock);
@@ -403,50 +406,39 @@ celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, p
 	return status;
 }
 
-celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-
-	hash_map_iterator_pt svc_iter = hashMapIterator_create(ts->servicesMap);
-	while(hashMapIterator_hasNext(svc_iter)){
-		hash_map_pt msgTypes = (hash_map_pt) hashMapIterator_nextValue(svc_iter);
-		if (hashMap_size(msgTypes) > 0){
-			ts->serializerSvc->emptyMsgTypesMap(ts->serializerSvc->serializer, msgTypes);
+	if (ts->serializerSvc == serializerSvc) { //only act if svc removed is services used
+		hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializersMap);
+		while (hashMapIterator_hasNext(&iter)) {
+            pubsub_msg_serializer_map_t* map = hashMapIterator_nextValue(&iter);
+            ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
 		}
+		ts->serializerSvc = NULL;
 	}
-	hashMapIterator_destroy(svc_iter);
-
-	ts->serializerSvc = NULL;
-
 	celixThreadMutex_unlock(&ts->ts_lock);
 
 	return status;
 }
 
-static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service){
+static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void* svc){
 	celix_status_t status = CELIX_SUCCESS;
 	topic_subscription_pt ts = handle;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-	if (!hashMap_containsKey(ts->servicesMap, service)) {
-		hash_map_pt msgTypes = hashMap_create(uintHash, NULL, uintEquals, NULL); //key = msgId, value = pubsub_message_type
-
+	if (!hashMap_containsKey(ts->msgSerializersMap, svc)) {
 		bundle_pt bundle = NULL;
 		serviceReference_getBundle(reference, &bundle);
 
-		if (ts->serializerSvc != NULL){
-			ts->serializerSvc->fillMsgTypesMap(ts->serializerSvc->serializer, msgTypes,bundle);
-		}
-
-		if(hashMap_size(msgTypes)==0){ //If the msgTypes hashMap is not filled, the service is an unsupported subscriber
-			hashMap_destroy(msgTypes,false,false);
-			printf("TS: Unsupported subscriber!\n");
-		}
-		else{
-			hashMap_put(ts->servicesMap, service, msgTypes);
+		if (ts->serializerSvc != NULL) {
+            pubsub_msg_serializer_map_t* map = NULL;
+            ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map);
+            if (map != NULL) {
+                hashMap_put(ts->msgSerializersMap, svc, map);
+            }
 		}
-
 	}
 	celixThreadMutex_unlock(&ts->ts_lock);
 	printf("TS: New subscriber registered.\n");
@@ -454,18 +446,16 @@ static celix_status_t topicsub_subscriberTracked(void * handle, service_referenc
 
 }
 
-static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service){
+static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void* svc){
 	celix_status_t status = CELIX_SUCCESS;
 	topic_subscription_pt ts = handle;
 
-	celixThreadMutex_lock(&ts->ts_lock);
-	if (hashMap_containsKey(ts->servicesMap, service)) {
-		hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service);
-		if(msgTypes!=NULL){
-			if (ts->serializerSvc != NULL){
-				ts->serializerSvc->emptyMsgTypesMap(ts->serializerSvc->serializer, msgTypes);
-			}
-			hashMap_destroy(msgTypes,false,false);
+
+    celixThreadMutex_lock(&ts->ts_lock);
+	if (hashMap_containsKey(ts->msgSerializersMap, svc)) {
+		pubsub_msg_serializer_map_t* map = hashMap_remove(ts->msgSerializersMap, svc);
+		if (ts->serializerSvc != NULL){
+			ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
 		}
 	}
 	celixThreadMutex_unlock(&ts->ts_lock);
@@ -475,59 +465,51 @@ static celix_status_t topicsub_subscriberUntracked(void * handle, service_refere
 }
 
 
-static void process_msg(topic_subscription_pt sub,pubsub_udp_msg_pt msg){
+static void process_msg(topic_subscription_pt sub, pubsub_udp_msg_t* msg){
 
-	hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap);
-	while (hashMapIterator_hasNext(iter)) {
-		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+	hash_map_iterator_t iter = hashMapIterator_construct(sub->msgSerializersMap);
+	celixThreadMutex_lock(&sub->ts_lock);
+	while (hashMapIterator_hasNext(&iter)) {
+		hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
 		pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry);
-		hash_map_pt msgTypes = hashMapEntry_getValue(entry);
+		pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
 
-		pubsub_message_type *msgType = hashMap_get(msgTypes,&(msg->header.type));
+		pubsub_msg_serializer_t* msgSer = hashMap_get(map->serializers, (void *)(uintptr_t )msg->header.type);
 
-		if (msgType == NULL) {
+		if (msgSer == NULL) {
 			printf("TS: Primary message %d not supported. NOT receiving any part of the whole message.\n",msg->header.type);
-		}
-		else if (sub->serializerSvc == NULL){
-			printf("TS: No active serializer service found!\n");
-		}
-		else{
+		} else {
 			void *msgInst = NULL;
-			char *name = sub->serializerSvc->getName(sub->serializerSvc->serializer, msgType);
-			version_pt msgVersion = sub->serializerSvc->getVersion(sub->serializerSvc->serializer, msgType);
-
-			bool validVersion = checkVersion(msgVersion,&msg->header);
-
+			bool validVersion = checkVersion(msgSer->msgVersion, &msg->header);
 			if(validVersion){
-				celix_status_t status = sub->serializerSvc->deserialize(sub->serializerSvc->serializer, msgType, (const void *) msg->payload, &msgInst);
-
+                celix_status_t status = msgSer->deserialize(msgSer->handle, msg->payload, 0, &msgInst);
 				if (status == CELIX_SUCCESS) {
 					bool release = true;
 					pubsub_multipart_callbacks_t mp_callbacks;
-					mp_callbacks.handle = sub;
+					mp_callbacks.handle = map;
 					mp_callbacks.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForMsgType;
 					mp_callbacks.getMultipart = NULL;
 
-					subsvc->receive(subsvc->handle, name, msg->header.type, msgInst, &mp_callbacks, &release);
-					if(release){
-						sub->serializerSvc->freeMsg(sub->serializerSvc->serializer, msgType, msgInst);
+					subsvc->receive(subsvc->handle, msgSer->msgName, msg->header.type, msgInst, &mp_callbacks, &release);
+					if (release) {
+                        msgSer->freeMsg(msgSer->handle, msgInst);
 					}
 				}
 				else{
-					printf("TS: Cannot deserialize msgType %s.\n",name);
+					printf("TS: Cannot deserialize msgType %s.\n", msgSer->msgName);
 				}
 
 			}
-			else{
+			else {
 				int major=0,minor=0;
-				version_getMajor(msgVersion,&major);
-				version_getMinor(msgVersion,&minor);
-				printf("TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",name,major,minor,msg->header.major,msg->header.minor);
+				version_getMajor(msgSer->msgVersion, &major);
+				version_getMinor(msgSer->msgVersion, &minor);
+				printf("TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",
+                       msgSer->msgName, major, minor, msg->header.major, msg->header.minor);
 			}
-
 		}
 	}
-	hashMapIterator_destroy(iter);
+	celixThreadMutex_unlock(&sub->ts_lock);
 }
 
 static void* udp_recv_thread_func(void * arg) {
@@ -555,7 +537,7 @@ static void* udp_recv_thread_func(void * arg) {
             unsigned int size;
             if(largeUdp_dataAvailable(sub->largeUdpHandle, events[i].data.fd, &index, &size) == true) {
                 // Handle data
-                pubsub_udp_msg_pt udpMsg = NULL;
+                pubsub_udp_msg_t* udpMsg = NULL;
                 if(largeUdp_read(sub->largeUdpHandle, index, (void**)&udpMsg, size) != 0) {
                 	printf("TS: ERROR largeUdp_read with index %d\n", index);
                 	continue;
@@ -577,19 +559,19 @@ static void* udp_recv_thread_func(void * arg) {
 }
 
 
-static void sigusr1_sighandler(int signo){
+static void sigusr1_sighandler(int signo) {
 	printf("TS: Topic subscription being shut down...\n");
 	return;
 }
 
-static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){
+static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr) {
 	bool check=false;
 	int major=0,minor=0;
 
-	if(msgVersion!=NULL){
+	if (msgVersion!=NULL) {
 		version_getMajor(msgVersion,&major);
 		version_getMinor(msgVersion,&minor);
-		if(hdr->major==((unsigned char)major)){ /* Different major means incompatible */
+		if (hdr->major==((unsigned char)major)) { /* Different major means incompatible */
 			check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
 		}
 	}
@@ -597,8 +579,24 @@ static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){
 	return check;
 }
 
-static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId){
-	*msgTypeId = utils_stringHash(msgType);
-	return 0;
+static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* out) {
+    pubsub_msg_serializer_map_t* map = handle;
+    hash_map_iterator_t iter = hashMapIterator_construct(map->serializers);
+    unsigned int msgTypeId = 0;
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter);
+        if (strncmp(msgSer->msgName, msgType, 1024 * 1024) == 0) {
+            msgTypeId = msgSer->msgId;
+            break;
+        }
+    }
+
+    if (msgTypeId == 0) {
+        printf("Cannot find msg type id for msgType %s\n", msgType);
+        return -1;
+    } else {
+        *out = msgTypeId;
+        return 0;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/CMakeLists.txt b/pubsub/pubsub_admin_zmq/CMakeLists.txt
index 956830d..49eba87 100644
--- a/pubsub/pubsub_admin_zmq/CMakeLists.txt
+++ b/pubsub/pubsub_admin_zmq/CMakeLists.txt
@@ -52,7 +52,6 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 	    	${ZMQ_CRYPTO_C}
 	    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
 	    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
-	    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/dyn_msg_utils.c
 	    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
 	)
 

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
index 2f81bff..3c36986 100644
--- a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
+++ b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
@@ -49,7 +49,7 @@
 
 struct pubsub_admin {
 
-	pubsub_serializer_service_pt serializerSvc;
+	pubsub_serializer_service_t* serializerSvc;
 
 	bundle_context_pt bundle_context;
 	log_helper_pt loghelper;
@@ -89,7 +89,7 @@ celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* sco
 celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score);
 celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score);
 
-celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
-celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc);
 
 #endif /* PUBSUB_ADMIN_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h b/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
index b6b76c6..158dfe7 100644
--- a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
+++ b/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
@@ -34,14 +34,14 @@
 
 typedef struct topic_publication *topic_publication_pt;
 
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out);
 celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
 
 celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
 celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
 
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc);
-celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
 
 celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
 celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
index c6fe93a..1fbbaaf 100644
--- a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
+++ b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
@@ -38,7 +38,7 @@
 
 typedef struct topic_subscription* topic_subscription_pt;
 
-celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_pt serializer, char* scope, char* topic,topic_subscription_pt* out);
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_t* serializer, char* scope, char* topic,topic_subscription_pt* out);
 celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
 celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
 celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);
@@ -51,8 +51,8 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
 celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
 celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
 
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc);
-celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc);
 
 celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription);
 celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt subscription);

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
index 09fcd8c..5c9a5d5 100644
--- a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
@@ -666,7 +666,7 @@ celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoin
 	return status;
 }
 
-celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc){
 	celix_status_t status = CELIX_SUCCESS;
 	admin->serializerSvc = serializerSvc;
 
@@ -694,7 +694,7 @@ celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serialize
 	return status;
 }
 
-celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc){
 	celix_status_t status = CELIX_SUCCESS;
 	admin->serializerSvc = NULL;
 

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
index bb8ff56..2e95874 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
@@ -47,7 +47,6 @@
 #include "version.h"
 
 #include "pubsub_common.h"
-#include "dyn_msg_utils.h"
 #include "pubsub_utils.h"
 #include "publisher.h"
 
@@ -70,7 +69,7 @@ struct topic_publication {
 	array_list_pt pub_ep_list; //List<pubsub_endpoint>
 	hash_map_pt boundServices; //<bundle_pt,bound_service>
 	celix_thread_mutex_t tp_lock;
-	pubsub_serializer_service_pt serializerSvc;
+	pubsub_serializer_service_t* serializerSvc;
 };
 
 typedef struct publish_bundle_bound_service {
@@ -78,26 +77,26 @@ typedef struct publish_bundle_bound_service {
 	pubsub_publisher_pt service;
 	bundle_pt bundle;
 	char *topic;
-	hash_map_pt msgTypes;
+	pubsub_msg_serializer_map_t* map;
 	unsigned short getCount;
 	celix_thread_mutex_t mp_lock;
 	bool mp_send_in_progress;
 	array_list_pt mp_parts;
-}* publish_bundle_bound_service_pt;
+} publish_bundle_bound_service_t;
 
-typedef struct pubsub_msg{
+typedef struct pubsub_msg {
 	pubsub_msg_header_pt header;
 	char* payload;
 	int payloadSize;
-}* pubsub_msg_pt;
+} pubsub_msg_t;
 
 static unsigned int rand_range(unsigned int min, unsigned int max);
 
 static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
 static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
 
-static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle);
-static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc);
+static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle);
+static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc);
 
 static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, const void *msg);
 static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *msg, int flags);
@@ -105,7 +104,7 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsig
 
 static void delay_first_send_for_late_joiners(void);
 
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
 	celix_status_t status = CELIX_SUCCESS;
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
@@ -235,7 +234,7 @@ celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
 
 	hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
 	while(hashMapIterator_hasNext(iter)){
-		publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter);
+		publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(iter);
 		pubsub_destroyPublishBundleBoundService(bound);
 	}
 	hashMapIterator_destroy(iter);
@@ -332,43 +331,50 @@ celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub
 	return CELIX_SUCCESS;
 }
 
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc){
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&(pub->tp_lock));
 
-	pub->serializerSvc = serializerSvc;
-
-	hash_map_iterator_pt bs_iter = hashMapIterator_create(pub->boundServices);
-	while(hashMapIterator_hasNext(bs_iter)){
-		publish_bundle_bound_service_pt boundSvc = (publish_bundle_bound_service_pt) hashMapIterator_nextValue(bs_iter);
-		if (hashMap_size(boundSvc->msgTypes) == 0){
-			pub->serializerSvc->fillMsgTypesMap(pub->serializerSvc->serializer, boundSvc->msgTypes, boundSvc->bundle);
+	//clearing pref serializer
+	if (pub->serializerSvc != NULL) {
+		hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
+		while (hashMapIterator_hasNext(&iter)) {
+			publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter);
+			pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map);
+			bound->map = NULL;
 		}
 	}
-	hashMapIterator_destroy(bs_iter);
+
+	pub->serializerSvc = serializerSvc;
+	hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
+	while (hashMapIterator_hasNext(&iter)) {
+		hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+		bundle_pt bundle = hashMapEntry_getKey(entry);
+		publish_bundle_bound_service_t* boundSvc = hashMapEntry_getValue(entry);
+		pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, &boundSvc->map);
+	}
 
 	celixThreadMutex_unlock(&(pub->tp_lock));
 
 	return status;
 }
 
-celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* svc){
 	celix_status_t status = CELIX_SUCCESS;
-
 	celixThreadMutex_lock(&(pub->tp_lock));
 
-	hash_map_iterator_pt bs_iter = hashMapIterator_create(pub->boundServices);
-	while(hashMapIterator_hasNext(bs_iter)){
-		publish_bundle_bound_service_pt boundSvc = (publish_bundle_bound_service_pt) hashMapIterator_nextValue(bs_iter);
-		pub->serializerSvc->emptyMsgTypesMap(pub->serializerSvc->serializer, boundSvc->msgTypes);
+	if (pub->serializerSvc == svc) {
+		hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
+		while (hashMapIterator_hasNext(&iter)) {
+			publish_bundle_bound_service_t *boundSvc = hashMapIterator_nextValue(&iter);
+			pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, boundSvc->map);
+			boundSvc->map = NULL;
+		}
+		pub->serializerSvc = NULL;
 	}
-	hashMapIterator_destroy(bs_iter);
-
-	pub->serializerSvc = NULL;
 
 	celixThreadMutex_unlock(&(pub->tp_lock));
-
 	return status;
 }
 
@@ -384,7 +390,7 @@ static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt
 
 	celixThreadMutex_lock(&(publish->tp_lock));
 
-	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
+	publish_bundle_bound_service_t* bound = hashMap_get(publish->boundServices,bundle);
 	if(bound==NULL){
 		bound = pubsub_createPublishBundleBoundService(publish,bundle);
 		if(bound!=NULL){
@@ -410,7 +416,7 @@ static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_p
 
 	celixThreadMutex_lock(&(publish->tp_lock));
 
-	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
+	publish_bundle_bound_service_t* bound = hashMap_get(publish->boundServices,bundle);
 	if(bound!=NULL){
 
 		bound->getCount--;
@@ -434,7 +440,7 @@ static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_p
 	return CELIX_SUCCESS;
 }
 
-static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_pt msg, bool last){
+static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_t* msg, bool last){
 
 	bool ret = true;
 
@@ -474,7 +480,7 @@ static bool send_pubsub_mp_msg(zsock_t* zmq_socket, array_list_pt mp_msg_parts){
 	unsigned int i = 0;
 	unsigned int mp_num = arrayList_size(mp_msg_parts);
 	for(;i<mp_num;i++){
-		ret = ret && send_pubsub_msg(zmq_socket, (pubsub_msg_pt)arrayList_get(mp_msg_parts,i), (i==mp_num-1));
+		ret = ret && send_pubsub_msg(zmq_socket, (pubsub_msg_t*)arrayList_get(mp_msg_parts,i), (i==mp_num-1));
 	}
 	arrayList_clear(mp_msg_parts);
 
@@ -489,10 +495,8 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, con
 }
 
 static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *msg, int flags){
-
 	int status = 0;
-
-	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle;
+	publish_bundle_bound_service_t* bound = handle;
 
 	celixThreadMutex_lock(&(bound->mp_lock));
 	if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & PUBSUB_PUBLISHER_LAST_MSG) && bound->mp_send_in_progress){ //means a real mp_msg
@@ -501,38 +505,33 @@ static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTy
 		return -3;
 	}
 
-	pubsub_message_type *msgType = hashMap_get(bound->msgTypes, &msgTypeId);
-
+	pubsub_msg_serializer_t* msgSer = NULL;
+	if (bound->map != NULL) {
+		msgSer = hashMap_get(bound->map->serializers, (void*)(uintptr_t)msgTypeId);
+	}
 	int major=0, minor=0;
 
-	if (msgType != NULL && bound->parent->serializerSvc != NULL) {
-
-		version_pt msgVersion = bound->parent->serializerSvc->getVersion(bound->parent->serializerSvc->serializer, msgType);
-
+	if (msgSer != NULL) {
 		pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
-
 		strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
-
 		msg_hdr->type = msgTypeId;
-		if (msgVersion != NULL){
-			version_getMajor(msgVersion, &major);
-			version_getMinor(msgVersion, &minor);
+		if (msgSer->msgVersion != NULL){
+			version_getMajor(msgSer->msgVersion, &major);
+			version_getMinor(msgSer->msgVersion, &minor);
 			msg_hdr->major = major;
 			msg_hdr->minor = minor;
 		}
 
-		void* serializedOutput = NULL;
-		int serializedOutputLen = 0;
-		bound->parent->serializerSvc->serialize(bound->parent->serializerSvc->serializer, msgType, msg, &serializedOutput, &serializedOutputLen);
-
-		pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg));
+		char* serializedOutput = NULL;
+		size_t serializedOutputLen = 0;
+		msgSer->serialize(msgSer->handle, msg, &serializedOutput, &serializedOutputLen);
+		pubsub_msg_t* msg = calloc(1,sizeof(struct pubsub_msg));
 		msg->header = msg_hdr;
 		msg->payload = (char *) serializedOutput;
 		msg->payloadSize = serializedOutputLen;
-
 		bool snd = true;
 
-		switch(flags){
+		switch (flags) {
 		case PUBSUB_PUBLISHER_FIRST_MSG:
 			bound->mp_send_in_progress = true;
 			arrayList_add(bound->mp_parts,msg);
@@ -602,9 +601,9 @@ static unsigned int rand_range(unsigned int min, unsigned int max){
 
 }
 
-static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){
+static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){
 
-	publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
+	publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound));
 
 	if (bound != NULL) {
 		bound->service = calloc(1, sizeof(*bound->service));
@@ -617,7 +616,11 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
 		bound->getCount = 1;
 		bound->mp_send_in_progress = false;
 		celixThreadMutex_create(&bound->mp_lock,NULL);
-		bound->msgTypes = hashMap_create(uintHash, NULL, uintEquals, NULL); //<int* (msgId),pubsub_message_type>
+
+		//TODO check if lock is needed. e.g. was the caller already locked?
+		if (tp->serializerSvc != NULL) {
+			tp->serializerSvc->createSerializerMap(tp->serializerSvc->handle, bundle, &bound->map);
+		}
 		arrayList_create(&bound->mp_parts);
 
 		pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
@@ -628,10 +631,6 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
 		bound->service->send = pubsub_topicPublicationSend;
 		bound->service->sendMultipart = pubsub_topicPublicationSendMultipart;
 
-		if (tp->serializerSvc != NULL){
-			tp->serializerSvc->fillMsgTypesMap(tp->serializerSvc->serializer, bound->msgTypes,bound->bundle);
-		}
-
 	}
 	else
 	{
@@ -645,26 +644,24 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
 	return bound;
 }
 
-static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){
+static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc){
 
 	celixThreadMutex_lock(&boundSvc->mp_lock);
 
-	if(boundSvc->service != NULL){
+	if (boundSvc->service != NULL) {
 		free(boundSvc->service);
 	}
 
-	if(boundSvc->msgTypes != NULL){
-		if (boundSvc->parent->serializerSvc != NULL){
-			boundSvc->parent->serializerSvc->emptyMsgTypesMap(boundSvc->parent->serializerSvc->serializer, boundSvc->msgTypes);
-		}
-		hashMap_destroy(boundSvc->msgTypes,false,false);
+	if (boundSvc->map != NULL && boundSvc->parent->serializerSvc != NULL) {
+		boundSvc->parent->serializerSvc->destroySerializerMap(boundSvc->parent->serializerSvc->handle, boundSvc->map);
+		boundSvc->map = NULL;
 	}
 
-	if(boundSvc->mp_parts!=NULL){
+	if (boundSvc->mp_parts!=NULL) {
 		arrayList_destroy(boundSvc->mp_parts);
 	}
 
-	if(boundSvc->topic!=NULL){
+	if (boundSvc->topic!=NULL) {
 		free(boundSvc->topic);
 	}
 

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
index 3de56af..7ef2c5d 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
@@ -46,7 +46,6 @@
 
 #include "subscriber.h"
 #include "publisher.h"
-#include "dyn_msg_utils.h"
 #include "pubsub_utils.h"
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
@@ -58,8 +57,7 @@
 #define POLL_TIMEOUT  	250
 #define ZMQ_POLL_TIMEOUT_MS_ENV 	"ZMQ_POLL_TIMEOUT_MS"
 
-struct topic_subscription{
-
+struct topic_subscription {
 	zsock_t* zmq_socket;
 	zcert_t * zmq_cert;
 	zcert_t * zmq_pub_cert;
@@ -71,26 +69,25 @@ struct topic_subscription{
 	celix_thread_mutex_t ts_lock;
 	bundle_context_pt context;
 
-	hash_map_pt servicesMap; // key = service, value = msg types map
+	hash_map_pt msgSerializers; // key = service ptr, value = pubsub_msg_serializer_map_t*
 	array_list_pt pendingConnections;
 	array_list_pt pendingDisconnections;
 
 	celix_thread_mutex_t pendingConnections_lock;
 	celix_thread_mutex_t pendingDisconnections_lock;
 	unsigned int nrSubscribers;
-	pubsub_serializer_service_pt serializerSvc;
-
+	pubsub_serializer_service_t* serializerSvc;
 };
 
-typedef struct complete_zmq_msg{
+typedef struct complete_zmq_msg {
 	zframe_t* header;
 	zframe_t* payload;
 }* complete_zmq_msg_pt;
 
-typedef struct mp_handle{
-	hash_map_pt svc_msg_db;
+typedef struct mp_handle {
+	pubsub_msg_serializer_map_t* map;
 	hash_map_pt rcv_msg_map;
-}* mp_handle_pt;
+} mp_handle_t;
 
 typedef struct msg_map_entry{
 	bool retain;
@@ -104,12 +101,12 @@ static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr);
 static void sigusr1_sighandler(int signo);
 static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
 static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain, void **part);
-static mp_handle_pt create_mp_handle(topic_subscription_pt sub, hash_map_pt svc_msg_db,array_list_pt rcv_msg_list);
-static void destroy_mp_handle(mp_handle_pt mp_handle);
+static mp_handle_t* create_mp_handle(topic_subscription_pt sub, pubsub_msg_serializer_map_t* map, array_list_pt rcv_msg_list);
+static void destroy_mp_handle(mp_handle_t* mp_handle);
 static void connectPendingPublishers(topic_subscription_pt sub);
 static void disconnectPendingPublishers(topic_subscription_pt sub);
 
-celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_pt serializer, char* scope, char* topic,topic_subscription_pt* out){
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_t* serializer, char* scope, char* topic,topic_subscription_pt* out){
 	celix_status_t status = CELIX_SUCCESS;
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
@@ -223,7 +220,7 @@ celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,
 	celixThreadMutex_create(&ts->socket_lock, NULL);
 	celixThreadMutex_create(&ts->ts_lock,NULL);
 	arrayList_create(&ts->sub_ep_list);
-	ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL);
+	ts->msgSerializers = hashMap_create(NULL, NULL, NULL, NULL);
 	arrayList_create(&ts->pendingConnections);
 	arrayList_create(&ts->pendingDisconnections);
 	celixThreadMutex_create(&ts->pendingConnections_lock, NULL);
@@ -269,7 +266,7 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
 	serviceTracker_destroy(ts->tracker);
 	arrayList_clear(ts->sub_ep_list);
 	arrayList_destroy(ts->sub_ep_list);
-	hashMap_destroy(ts->servicesMap,false,false);
+	hashMap_destroy(ts->msgSerializers,false,false);
 
 	celixThreadMutex_lock(&ts->pendingConnections_lock);
 	arrayList_destroy(ts->pendingConnections);
@@ -429,7 +426,7 @@ unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
 	return ts->nrSubscribers;
 }
 
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&ts->ts_lock);
@@ -441,22 +438,18 @@ celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, p
 	return status;
 }
 
-celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* svc){
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-
-	hash_map_iterator_pt svc_iter = hashMapIterator_create(ts->servicesMap);
-	while(hashMapIterator_hasNext(svc_iter)){
-		hash_map_pt msgTypes = (hash_map_pt) hashMapIterator_nextValue(svc_iter);
-		if (hashMap_size(msgTypes) > 0){
-			ts->serializerSvc->emptyMsgTypesMap(ts->serializerSvc->serializer, msgTypes);
+	if (ts->serializerSvc == svc) {
+		hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializers);
+		while(hashMapIterator_hasNext(&iter)){
+			pubsub_msg_serializer_map_t* map = hashMapIterator_nextValue(&iter);
+			ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
 		}
 	}
-	hashMapIterator_destroy(svc_iter);
-
 	ts->serializerSvc = NULL;
-
 	celixThreadMutex_unlock(&ts->ts_lock);
 
 	return status;
@@ -467,24 +460,19 @@ static celix_status_t topicsub_subscriberTracked(void * handle, service_referenc
 	topic_subscription_pt ts = handle;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-	if (!hashMap_containsKey(ts->servicesMap, service)) {
-		hash_map_pt msgTypes = hashMap_create(uintHash, NULL, uintEquals, NULL); //key = msgId, value = pubsub_message_type
-
+	if (!hashMap_containsKey(ts->msgSerializers, service)) {
 		bundle_pt bundle = NULL;
 		serviceReference_getBundle(reference, &bundle);
 
-		if (ts->serializerSvc != NULL){
-			ts->serializerSvc->fillMsgTypesMap(ts->serializerSvc->serializer, msgTypes,bundle);
-		}
-
-		if(hashMap_size(msgTypes)==0){ //If the msgTypes hashMap is not filled, the service is an unsupported subscriber
-			hashMap_destroy(msgTypes,false,false);
-			printf("TS: Unsupported subscriber!\n");
-		}
-		else{
-			hashMap_put(ts->servicesMap, service, msgTypes);
+		if (ts->serializerSvc != NULL) {
+			pubsub_msg_serializer_map_t* map = NULL;
+			ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map);
+			if (map != NULL) {
+				hashMap_put(ts->msgSerializers, service, map);
+			} else {
+				printf("TS: Cannot create msg serializer map\n");
+			}
 		}
-
 	}
 	celixThreadMutex_unlock(&ts->ts_lock);
 	printf("TS: New subscriber registered.\n");
@@ -497,14 +485,9 @@ static celix_status_t topicsub_subscriberUntracked(void * handle, service_refere
 	topic_subscription_pt ts = handle;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-	if (hashMap_containsKey(ts->servicesMap, service)) {
-		hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service);
-		if(msgTypes!=NULL){
-			if (ts->serializerSvc != NULL){
-				ts->serializerSvc->emptyMsgTypesMap(ts->serializerSvc->serializer, msgTypes);
-			}
-			hashMap_destroy(msgTypes,false,false);
-		}
+	if (hashMap_containsKey(ts->msgSerializers, service)) {
+		pubsub_msg_serializer_map_t* map = hashMap_remove(ts->msgSerializers, service);
+        ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
 	}
 	celixThreadMutex_unlock(&ts->ts_lock);
 
@@ -513,66 +496,55 @@ static celix_status_t topicsub_subscriberUntracked(void * handle, service_refere
 }
 
 
-static void process_msg(topic_subscription_pt sub,array_list_pt msg_list){
+static void process_msg(topic_subscription_pt sub, array_list_pt msg_list) {
 
 	pubsub_msg_header_pt first_msg_hdr = (pubsub_msg_header_pt)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->header);
 
-	hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap);
-	while (hashMapIterator_hasNext(iter)) {
-		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+	hash_map_iterator_t iter = hashMapIterator_construct(sub->msgSerializers);
+	while (hashMapIterator_hasNext(&iter)) {
+		hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
 		pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry);
-		hash_map_pt msgTypes = hashMapEntry_getValue(entry);
+		pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
 
-		pubsub_message_type *msgType = hashMap_get(msgTypes,&(first_msg_hdr->type));
-		if (msgType == NULL) {
-			printf("TS: Primary message %d not supported. NOT sending any part of the whole message.\n",first_msg_hdr->type);
-		}
-		else if (sub->serializerSvc == NULL){
-			printf("TS: No active serializer found!\n");
-		}
-		else{
+		pubsub_msg_serializer_t* msgSer = hashMap_get(map->serializers, (void*)(uintptr_t )first_msg_hdr->type);
+		if (msgSer == NULL) {
+			printf("TS: Primary message %d not supported. NOT sending any part of the whole message.\n", first_msg_hdr->type);
+		} else {
 			void *msgInst = NULL;
-			char *name = sub->serializerSvc->getName(sub->serializerSvc->serializer, msgType);
-			version_pt msgVersion = sub->serializerSvc->getVersion(sub->serializerSvc->serializer, msgType);
-
-			bool validVersion = checkVersion(msgVersion,first_msg_hdr);
-
+			bool validVersion = checkVersion(msgSer->msgVersion, first_msg_hdr);
 			if(validVersion){
-
-				celix_status_t status = sub->serializerSvc->deserialize(sub->serializerSvc->serializer, msgType, (const void *) zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), &msgInst);
+				celix_status_t status = msgSer->deserialize(msgSer->handle, (const char*)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), 0, &msgInst);
 
 				if (status == CELIX_SUCCESS) {
 					bool release = true;
 
-					mp_handle_pt mp_handle = create_mp_handle(sub, msgTypes,msg_list);
+					mp_handle_t* mp_handle = create_mp_handle(sub, map, msg_list);
 					pubsub_multipart_callbacks_t mp_callbacks;
 					mp_callbacks.handle = mp_handle;
 					mp_callbacks.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForMsgType;
 					mp_callbacks.getMultipart = pubsub_getMultipart;
-					subsvc->receive(subsvc->handle, name, first_msg_hdr->type, msgInst, &mp_callbacks, &release);
+					subsvc->receive(subsvc->handle, msgSer->msgName, first_msg_hdr->type, msgInst, &mp_callbacks, &release);
 
-					if(release){
-						sub->serializerSvc->freeMsg(sub->serializerSvc->serializer, msgType, msgInst);
+					if (release) {
+						msgSer->freeMsg(msgSer->handle, msgInst);
 					}
-					if(mp_handle!=NULL){
+					if (mp_handle!=NULL) {
 						destroy_mp_handle(mp_handle);
 					}
 				}
 				else{
-					printf("TS: Cannot deserialize msgType %s.\n",name);
+					printf("TS: Cannot deserialize msgType %s.\n", msgSer->msgName);
 				}
 
-			}
-			else{
+			} else {
 				int major=0,minor=0;
-				version_getMajor(msgVersion,&major);
-				version_getMinor(msgVersion,&minor);
-				printf("TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",name,major,minor,first_msg_hdr->major,first_msg_hdr->minor);
+				version_getMajor(msgSer->msgVersion, &major);
+				version_getMinor(msgSer->msgVersion, &minor);
+				printf("TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",
+					   msgSer->msgName, major, minor, first_msg_hdr->major, first_msg_hdr->minor);
 			}
-
 		}
 	}
-	hashMapIterator_destroy(iter);
 
 	int i = 0;
 	for(;i<arrayList_size(msg_list);i++){
@@ -737,7 +709,7 @@ static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain
 		return -1;
 	}
 
-	mp_handle_pt mp_handle = (mp_handle_pt)handle;
+	mp_handle_t* mp_handle = handle;
 	msg_map_entry_pt entry = hashMap_get(mp_handle->rcv_msg_map,&msgTypeId);
 	if(entry!=NULL){
 		entry->retain = retain;
@@ -753,63 +725,55 @@ static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain
 
 }
 
-static mp_handle_pt create_mp_handle(topic_subscription_pt sub, hash_map_pt svc_msg_db,array_list_pt rcv_msg_list){
+static mp_handle_t* create_mp_handle(topic_subscription_pt sub, pubsub_msg_serializer_map_t* map, array_list_pt rcv_msg_list) {
 
 	if(arrayList_size(rcv_msg_list)==1){ //Means it's not a multipart message
 		return NULL;
 	}
 
-	mp_handle_pt mp_handle = calloc(1,sizeof(struct mp_handle));
-	mp_handle->svc_msg_db = svc_msg_db;
-	mp_handle->rcv_msg_map = hashMap_create(uintHash, NULL, uintEquals, NULL);
-
-	int i=1; //We skip the first message, it will be handle differently
-	for(;i<arrayList_size(rcv_msg_list);i++){
-		complete_zmq_msg_pt c_msg = (complete_zmq_msg_pt)arrayList_get(rcv_msg_list,i);
+	mp_handle_t* mp_handle = calloc(1,sizeof(struct mp_handle));
+	mp_handle->map = map;
+	mp_handle->rcv_msg_map = hashMap_create(NULL, NULL, NULL, NULL);
 
+	int i; //We skip the first message, it will be handle differently
+	for (i=1 ; i<arrayList_size(rcv_msg_list) ; i++) {
+		complete_zmq_msg_pt c_msg = arrayList_get(rcv_msg_list,i);
 		pubsub_msg_header_pt header = (pubsub_msg_header_pt)zframe_data(c_msg->header);
 
-		pubsub_message_type *msgType = hashMap_get(svc_msg_db,&(header->type));
-		if (msgType != NULL && sub->serializerSvc != NULL) {
+		pubsub_msg_serializer_t* msgSer = hashMap_get(map->serializers, (void*)(uintptr_t)(header->type));
+		if (msgSer != NULL) {
 			void *msgInst = NULL;
-			version_pt msgVersion = sub->serializerSvc->getVersion(sub->serializerSvc->serializer, msgType);
-
-			bool validVersion = checkVersion(msgVersion,header);
-
+			bool validVersion = checkVersion(msgSer->msgVersion, header);
 			if(validVersion){
-				celix_status_t status = sub->serializerSvc->deserialize(sub->serializerSvc->serializer, msgType, (const void *) zframe_data(c_msg->payload), &msgInst);
+				//TODO make the getMultipart lazy?
+				celix_status_t status = msgSer->deserialize(msgSer->handle, (const char*)zframe_data(c_msg->payload), 0, &msgInst);
 
 				if(status == CELIX_SUCCESS){
-					unsigned int* msgId = calloc(1,sizeof(unsigned int));
-					*msgId = header->type;
 					msg_map_entry_pt entry = calloc(1,sizeof(struct msg_map_entry));
 					entry->msgInst = msgInst;
-					hashMap_put(mp_handle->rcv_msg_map,msgId,entry);
+					hashMap_put(mp_handle->rcv_msg_map, (void*)(uintptr_t)(header->type), entry);
 				}
 			}
 		}
-
 	}
-
 	return mp_handle;
-
 }
 
-static void destroy_mp_handle(mp_handle_pt mp_handle){
+static void destroy_mp_handle(mp_handle_t* mp_handle){
 
 	hash_map_iterator_pt iter = hashMapIterator_create(mp_handle->rcv_msg_map);
 	while(hashMapIterator_hasNext(iter)){
 		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
-		unsigned int* msgId = (unsigned int*)hashMapEntry_getKey(entry);
+		unsigned int msgId = (unsigned int)(uintptr_t)hashMapEntry_getKey(entry);
 		msg_map_entry_pt msgEntry = hashMapEntry_getValue(entry);
-		pubsub_message_type* msgType = hashMap_get(mp_handle->svc_msg_db,msgId);
-		if(msgType!=NULL){
-			if(!msgEntry->retain){
-				free(msgEntry->msgInst);
+		pubsub_msg_serializer_t* msgSer = hashMap_get(mp_handle->map->serializers, (void*)(uintptr_t)msgId);
+		if (msgSer != NULL) {
+			if (!msgEntry->retain) {
+				msgSer->freeMsg(msgSer->handle, msgEntry->msgInst);
 			}
 		}
 		else{
-			printf("TS: ERROR: Cannot find pubsub_message_type for msg %u, so cannot destroy it!\n",*msgId);
+			printf("TS: ERROR: Cannot find pubsub_message_type for msg %u, so cannot destroy it!\n", msgId);
 		}
 	}
 	hashMapIterator_destroy(iter);

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_common/public/include/dyn_msg_utils.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/dyn_msg_utils.h b/pubsub/pubsub_common/public/include/dyn_msg_utils.h
deleted file mode 100644
index 71085ab..0000000
--- a/pubsub/pubsub_common/public/include/dyn_msg_utils.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * dyn_msg_utils.h
- *
- *  \date       Nov 11, 2015
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#ifndef DYN_MSG_UTILS_H_
-#define DYN_MSG_UTILS_H_
-
-#include "bundle.h"
-#include "hash_map.h"
-
-unsigned int uintHash(const void * uintNum);
-int uintEquals(const void * uintNum, const void * toCompare);
-
-void fillMsgTypesMap(hash_map_pt msgTypesMap,bundle_pt bundle);
-void emptyMsgTypesMap(hash_map_pt msgTypesMap);
-
-#endif /* DYN_MSG_UTILS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_common/public/include/pubsub_admin.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_admin.h b/pubsub/pubsub_common/public/include/pubsub_admin.h
index 52cb75c..f7ab7e0 100644
--- a/pubsub/pubsub_common/public/include/pubsub_admin.h
+++ b/pubsub/pubsub_common/public/include/pubsub_admin.h
@@ -56,8 +56,8 @@ struct pubsub_admin_service {
 	celix_status_t (*matchPublisher)(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score);
 	celix_status_t (*matchSubscriber)(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score);
 
-	celix_status_t (*setSerializer)(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
-	celix_status_t (*removeSerializer)(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
+	celix_status_t (*setSerializer)(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc);
+	celix_status_t (*removeSerializer)(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc);
 
 };
 

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_common/public/include/pubsub_serializer.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_serializer.h b/pubsub/pubsub_common/public/include/pubsub_serializer.h
index f2df075..e9f9f6c 100644
--- a/pubsub/pubsub_common/public/include/pubsub_serializer.h
+++ b/pubsub/pubsub_common/public/include/pubsub_serializer.h
@@ -24,33 +24,44 @@
  *  \copyright	Apache License, Version 2.0
  */
 
-#ifndef PUBSUB_SERIALIZER_H_
-#define PUBSUB_SERIALIZER_H_
+#ifndef PUBSUB_SERIALIZER_SERVICE_H_
+#define PUBSUB_SERIALIZER_SERVICE_H_
 
 #include "service_reference.h"
 
 #include "pubsub_common.h"
 
-typedef struct _pubsub_message_type pubsub_message_type;
-
-typedef struct pubsub_serializer *pubsub_serializer_pt;
-
-struct pubsub_serializer_service {
+/**
+ * There should be a pubsub_serializer_t
+ * per msg type (msg id) per bundle
+ *
+ * The pubsub_serializer_service can create
+ * a serializer_map per bundle. Potentially using
+ * the extender pattern.
+ */
+typedef struct pubsub_msg_serializer {
+    void* handle;
+    unsigned int msgId;
+    const char* msgName;
+    version_pt msgVersion;
 
-	pubsub_serializer_pt serializer;
+    celix_status_t (*serialize)(void* handle, const void* input, char** out, size_t* outLen);
+    celix_status_t (*deserialize)(void* handle, const char* input, size_t inputLen, void** out); //note inputLen can be 0 if predefined size is not needed
 
-	celix_status_t (*serialize)(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output, int *outputLen);
-	celix_status_t (*deserialize)(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output);
+    void (*freeMsg)(void* handle, void* msg);
+} pubsub_msg_serializer_t;
 
-	void (*fillMsgTypesMap)(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap,bundle_pt bundle);
-	void (*emptyMsgTypesMap)(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap);
+typedef struct pubsub_msg_serializer_map {
+    bundle_pt bundle;
+    hash_map_pt serializers; //key = msg id (unsigned int), value = pubsub_serializer_t*
+} pubsub_msg_serializer_map_t;
 
-	version_pt (*getVersion)(pubsub_serializer_pt serializer, pubsub_message_type *msgType);
-	char* (*getName)(pubsub_serializer_pt serializer, pubsub_message_type *msgType);
-	void (*freeMsg)(pubsub_serializer_pt serializer, pubsub_message_type *msgType, void *msg);
+typedef struct pubsub_serializer_service {
+	void* handle;
 
-};
+	celix_status_t (*createSerializerMap)(void* handle, bundle_pt bundle, pubsub_msg_serializer_map_t** out);
+    celix_status_t (*destroySerializerMap)(void* handle, pubsub_msg_serializer_map_t* map);
 
-typedef struct pubsub_serializer_service *pubsub_serializer_service_pt;
+} pubsub_serializer_service_t;
 
-#endif /* PUBSUB_SERIALIZER_H_ */
+#endif /* PUBSUB_SERIALIZER_SERVICE_H_ */


Mime
View raw message