celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rlenfer...@apache.org
Subject [2/2] celix git commit: CELIX-407: Extracted pubsub_serializer to a stand-alone service
Date Fri, 31 Mar 2017 09:48:08 GMT
CELIX-407: Extracted pubsub_serializer to a stand-alone service


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/a030567d
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/a030567d
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/a030567d

Branch: refs/heads/develop
Commit: a030567d06e2c17c0dd0980f4e80ddd88cc5e21d
Parents: 6072ca6
Author: Roy Lenferink <lenferinkroy@gmail.com>
Authored: Fri Mar 31 11:20:48 2017 +0200
Committer: Roy Lenferink <lenferinkroy@gmail.com>
Committed: Fri Mar 31 11:41:25 2017 +0200

----------------------------------------------------------------------
 pubsub/CMakeLists.txt                           |  15 +-
 pubsub/deploy/CMakeLists.txt                    |  11 ++
 pubsub/pubsub_admin_udp_mc/CMakeLists.txt       |  12 +-
 .../private/include/pubsub_admin_impl.h         |   6 +
 .../include/pubsub_publish_service_private.h    |   6 +-
 .../private/include/topic_subscription.h        |   6 +-
 .../private/src/psa_activator.c                 |   3 +
 .../private/src/pubsub_admin_impl.c             |  62 +++++++-
 .../private/src/topic_publication.c             |  62 ++++++--
 .../private/src/topic_subscription.c            |  61 ++++++--
 pubsub/pubsub_admin_zmq/CMakeLists.txt          |  14 +-
 .../private/include/pubsub_admin_impl.h         |   6 +
 .../include/pubsub_publish_service_private.h    |   6 +-
 .../private/include/topic_subscription.h        |   6 +-
 .../private/src/psa_activator.c                 |   3 +
 .../private/src/pubsub_admin_impl.c             |  62 +++++++-
 .../private/src/topic_publication.c             |  62 ++++++--
 .../private/src/topic_subscription.c            |  74 +++++++---
 .../pubsub_common/public/include/pubsub_admin.h |   5 +
 .../public/include/pubsub_common.h              |   1 +
 .../public/include/pubsub_serializer.h          |  38 +++--
 .../public/src/pubsub_serializer.c              | 109 --------------
 pubsub/pubsub_serializer_json/CMakeLists.txt    |  44 ++++++
 .../private/include/pubsub_serializer_impl.h    |  65 +++++++++
 .../private/src/ps_activator.c                  | 111 ++++++++++++++
 .../private/src/pubsub_serializer_impl.c        | 143 +++++++++++++++++++
 .../private/include/pubsub_topology_manager.h   |   8 ++
 .../private/src/pstm_activator.c                |  45 ++++--
 .../private/src/pubsub_topology_manager.c       |  82 +++++++++++
 29 files changed, 904 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/CMakeLists.txt b/pubsub/CMakeLists.txt
index 2b87af0..44f3bd1 100644
--- a/pubsub/CMakeLists.txt
+++ b/pubsub/CMakeLists.txt
@@ -26,23 +26,10 @@ if (PUBSUB)
 
 	include_directories("${PROJECT_SOURCE_DIR}/utils/public/include")
 	include_directories("${PROJECT_SOURCE_DIR}/framework/public/include")
-
-	set (PUBSUB_SERIALIZER_SRC "${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_serializer.c")
-	set (SERIALIZER_PATH "" CACHE FILEPATH "Path to the directory which will contain the serializer (include / src).")
-	set (SERIALIZER_LIB_INCLUDE_DIR "" CACHE FILEPATH "Path to the include dir of the addiotional libs.")
-	set (SERIALIZER_LIB_PATH "" CACHE FILEPATH "Path to the additional library.")
-	if (EXISTS ${SERIALIZER_PATH})
-		file (GLOB PUBSUB_SERIALIZER_SRC ${SERIALIZER_PATH}/src/*.c)
-		
-		if (SERIALIZER_LIB_PATH)
-			get_filename_component(SERIALIZER_LIB_DIR ${SERIALIZER_LIB_PATH} DIRECTORY)
-			get_filename_component(SERIALIZER_LIB_FULLNAME ${SERIALIZER_LIB_PATH} NAME_WE)
-			string (REPLACE "lib" "" SERIALIZER_LIBRARY ${SERIALIZER_LIB_FULLNAME})
-		endif()
-	endif()
 	
 	add_subdirectory(pubsub_topology_manager)
 	add_subdirectory(pubsub_discovery)
+	add_subdirectory(pubsub_serializer_json)
 	add_subdirectory(pubsub_admin_zmq)
 	add_subdirectory(pubsub_admin_udp_mc)
 	add_subdirectory(examples)

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/deploy/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/deploy/CMakeLists.txt b/pubsub/deploy/CMakeLists.txt
index 3365166..b35ae14 100644
--- a/pubsub/deploy/CMakeLists.txt
+++ b/pubsub/deploy/CMakeLists.txt
@@ -27,6 +27,7 @@ add_deploy("pubsub_publisher_udp_mc"
        org.apache.celix.pubsub_admin.PubSubAdminUdpMc
        org.apache.celix.pubsub_publisher.PoiPublisher
        org.apache.celix.pubsub_publisher.PoiPublisher2
+       org.apache.celix.pubsub_serializer.PubSubSerializerJson
 )
 
 add_deploy("pubsub_subscriber_udp_mc" 
@@ -38,6 +39,7 @@ add_deploy("pubsub_subscriber_udp_mc"
        org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
        org.apache.celix.pubsub_admin.PubSubAdminUdpMc
        org.apache.celix.pubsub_subscriber.PoiSubscriber
+       org.apache.celix.pubsub_serializer.PubSubSerializerJson
 )
 
 add_deploy("pubsub_subscriber2_udp_mc" 
@@ -49,6 +51,7 @@ add_deploy("pubsub_subscriber2_udp_mc"
        org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
        org.apache.celix.pubsub_admin.PubSubAdminUdpMc
        org.apache.celix.pubsub_subscriber.PoiSubscriber
+       org.apache.celix.pubsub_serializer.PubSubSerializerJson
 )
 
 if (BUILD_PUBSUB_PSA_ZMQ)
@@ -65,6 +68,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 	       org.apache.celix.pubsub_admin.PubSubAdminUdpMc
 	       org.apache.celix.pubsub_publisher.PoiPublisher
 	       org.apache.celix.pubsub_publisher.PoiPublisher2
+	       org.apache.celix.pubsub_serializer.PubSubSerializerJson
 	    PROPERTIES
 	       poi1.psa=zmq
 	       poi2.psa=udp
@@ -80,6 +84,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 	       org.apache.celix.pubsub_admin.PubSubAdminZmq
 	       org.apache.celix.pubsub_admin.PubSubAdminUdpMc
 	       org.apache.celix.pubsub_subscriber.PoiSubscriber
+	       org.apache.celix.pubsub_serializer.PubSubSerializerJson
 	    PROPERTIES
 	       poi1.psa=zmq
 	       poi2.psa=udp
@@ -96,6 +101,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 	       org.apache.celix.pubsub_admin.PubSubAdminZmq
 	       org.apache.celix.pubsub_publisher.PoiPublisher
 	       org.apache.celix.pubsub_subscriber.PoiSubscriber
+	       org.apache.celix.pubsub_serializer.PubSubSerializerJson
 	)
 
 	add_deploy("pubsub_publisher_zmq"
@@ -108,6 +114,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 	       org.apache.celix.pubsub_admin.PubSubAdminZmq
 	       org.apache.celix.pubsub_publisher.PoiPublisher
 	       org.apache.celix.pubsub_publisher.PoiPublisher2
+	       org.apache.celix.pubsub_serializer.PubSubSerializerJson
 	   	PROPERTIES
 		    pubsub.scope=my_small_scope
 	)
@@ -121,6 +128,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 	       org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
 	       org.apache.celix.pubsub_admin.PubSubAdminZmq
 	       org.apache.celix.pubsub_subscriber.PoiSubscriber
+	       org.apache.celix.pubsub_serializer.PubSubSerializerJson
 	)
 
 	add_deploy("pubsub_subscriber2_zmq"
@@ -132,6 +140,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 	       org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
 	       org.apache.celix.pubsub_admin.PubSubAdminZmq
 	       org.apache.celix.pubsub_subscriber.PoiSubscriber
+	       org.apache.celix.pubsub_serializer.PubSubSerializerJson
 	)
 
 	# ZMQ Multipart
@@ -144,6 +153,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 	       org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
 	       org.apache.celix.pubsub_admin.PubSubAdminZmq
 	       org.apache.celix.pubsub_subscriber.MpSubscriber
+	       org.apache.celix.pubsub_serializer.PubSubSerializerJson
 	)
 
 	add_deploy("pubsub_mp_publisher_zmq"
@@ -155,6 +165,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 	       org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
 	       org.apache.celix.pubsub_admin.PubSubAdminZmq
 	       org.apache.celix.pubsub_publisher.MpPublisher
+	       org.apache.celix.pubsub_serializer.PubSubSerializerJson
 	)
 
 endif()

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/CMakeLists.txt b/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
index dd25b19..ce32db0 100644
--- a/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
+++ b/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
@@ -25,15 +25,6 @@ include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub")
 include_directories("private/include")
 include_directories("public/include")
 include_directories("${JANSSON_INCLUDE_DIR}")
-if (SERIALIZER_PATH)
-	include_directories("${SERIALIZER_PATH}/include")
-endif()
-if (SERIALIZER_LIB_INCLUDE_DIR)
-	include_directories("${SERIALIZER_LIB_INCLUDE_DIR}")
-endif()
-if (SERIALIZER_LIB_DIR)
-	link_directories("${SERIALIZER_LIB_DIR}")
-endif()
 
 add_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc
     BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_udp_multicast"
@@ -47,11 +38,10 @@ add_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc
     	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/dyn_msg_utils.c
     	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
     	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
-    	${PUBSUB_SERIALIZER_SRC}
 )
 
 set_target_properties(org.apache.celix.pubsub_admin.PubSubAdminUdpMc PROPERTIES INSTALL_RPATH "$ORIGIN")
-target_link_libraries(org.apache.celix.pubsub_admin.PubSubAdminUdpMc celix_framework celix_utils celix_dfi ${JANSSON_LIBRARIES} ${SERIALIZER_LIBRARY})
+target_link_libraries(org.apache.celix.pubsub_admin.PubSubAdminUdpMc celix_framework celix_utils celix_dfi ${JANSSON_LIBRARIES})
 
 install_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc)
 

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
index 35fc164..9eddf15 100644
--- a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
+++ b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
@@ -28,10 +28,13 @@
 #define PUBSUB_ADMIN_IMPL_H_
 
 #include "pubsub_admin.h"
+#include "pubsub_serializer.h"
 #include "log_helper.h"
 
 struct pubsub_admin {
 
+	pubsub_serializer_service_pt serializerSvc;
+
 	bundle_context_pt bundle_context;
 	log_helper_pt loghelper;
 
@@ -70,4 +73,7 @@ celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* sco
 celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score);
 celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score);
 
+celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
+
 #endif /* PUBSUB_ADMIN_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
index 57c7963..81690b8 100644
--- a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
+++ b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
@@ -30,6 +30,7 @@
 #include "publisher.h"
 #include "pubsub_endpoint.h"
 #include "pubsub_common.h"
+#include "pubsub_serializer.h"
 
 #define UDP_BASE_PORT	49152
 #define UDP_MAX_PORT	65000
@@ -41,12 +42,15 @@ typedef struct pubsub_udp_msg {
 } *pubsub_udp_msg_pt;
 
 typedef struct topic_publication *topic_publication_pt;
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP,char* bindIP, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* bindIP, topic_publication_pt *out);
 celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
 
 celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
 celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
 
+celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc);
+
 celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
 celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);
 

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h b/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
index 4ec705b..36c902e 100644
--- a/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
+++ b/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
@@ -34,10 +34,11 @@
 
 #include "pubsub_endpoint.h"
 #include "pubsub_common.h"
+#include "pubsub_serializer.h"
 
 typedef struct topic_subscription* topic_subscription_pt;
 
-celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, char* scope, char* topic,topic_subscription_pt* out);
+celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, pubsub_serializer_service_pt serializer, char* scope, char* topic,topic_subscription_pt* out);
 celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
 celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
 celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);
@@ -48,6 +49,9 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
 celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
 celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
 
+celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc);
+
 celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription);
 celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt subscription);
 unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt subscription);

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c b/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c
index 24202dd..cb298fe 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c
@@ -76,6 +76,9 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt context)
 		pubsubAdminSvc->matchPublisher = pubsubAdmin_matchPublisher;
 		pubsubAdminSvc->matchSubscriber = pubsubAdmin_matchSubscriber;
 
+		pubsubAdminSvc->setSerializer = pubsubAdmin_setSerializer;
+		pubsubAdminSvc->removeSerializer = pubsubAdmin_removeSerializer;
+
 		activator->adminService = pubsubAdminSvc;
 
 		status = bundleContext_registerService(context, PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration);

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
index cd0658b..6e9f4e8 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
@@ -252,7 +252,7 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
 
 		int i;
 
-		status += pubsub_topicSubscriptionCreate(admin->ifIpAddress, admin->bundle_context, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC,&any_sub);
+		status += pubsub_topicSubscriptionCreate(admin->ifIpAddress, admin->bundle_context, admin->serializerSvc, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC,&any_sub);
 
 		/* Connect all internal publishers */
 		celixThreadMutex_lock(&admin->localPublicationsLock);
@@ -334,7 +334,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 
 		if(subscription == NULL) {
 
-			status += pubsub_topicSubscriptionCreate(admin->ifIpAddress, admin->bundle_context, subEP->scope, subEP->topic,&subscription);
+			status += pubsub_topicSubscriptionCreate(admin->ifIpAddress, admin->bundle_context, admin->serializerSvc, subEP->scope, subEP->topic,&subscription);
 
 			/* Try to connect internal publishers */
 			if(factory!=NULL){
@@ -431,7 +431,7 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_
 
 		if (factory == NULL) {
 			topic_publication_pt pub = NULL;
-			status = pubsub_topicPublicationCreate(admin->sendSocket, pubEP,admin->mcIpAddress,&pub);
+			status = pubsub_topicPublicationCreate(admin->sendSocket, pubEP, admin->serializerSvc, admin->mcIpAddress,&pub);
 			if(status == CELIX_SUCCESS){
 				status = pubsub_topicPublicationStart(admin->bundle_context,pub,&factory);
 				if(status==CELIX_SUCCESS && factory !=NULL){
@@ -645,6 +645,62 @@ celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoin
 	return status;
 }
 
+celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc){
+	celix_status_t status = CELIX_SUCCESS;
+	admin->serializerSvc = serializerSvc;
+
+	/* Add serializer to all topic_publication_pt */
+	celixThreadMutex_lock(&admin->localPublicationsLock);
+	hash_map_iterator_pt lp_iter = hashMapIterator_create(admin->localPublications);
+	while(hashMapIterator_hasNext(lp_iter)){
+		service_factory_pt factory = (service_factory_pt) hashMapIterator_nextValue(lp_iter);
+		topic_publication_pt topic_pub = (topic_publication_pt) factory->handle;
+		pubsub_topicPublicationAddSerializer(topic_pub, admin->serializerSvc);
+	}
+	hashMapIterator_destroy(lp_iter);
+	celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+	/* Add serializer to all topic_subscription_pt */
+	celixThreadMutex_lock(&admin->subscriptionsLock);
+	hash_map_iterator_pt subs_iter = hashMapIterator_create(admin->subscriptions);
+	while(hashMapIterator_hasNext(subs_iter)){
+		topic_subscription_pt topic_sub = (topic_subscription_pt) hashMapIterator_nextValue(subs_iter);
+		pubsub_topicSubscriptionAddSerializer(topic_sub, admin->serializerSvc);
+	}
+	hashMapIterator_destroy(subs_iter);
+	celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+	return status;
+}
+
+celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc){
+	celix_status_t status = CELIX_SUCCESS;
+	admin->serializerSvc = NULL;
+
+	/* Remove serializer from all topic_publication_pt */
+	celixThreadMutex_lock(&admin->localPublicationsLock);
+	hash_map_iterator_pt lp_iter = hashMapIterator_create(admin->localPublications);
+	while(hashMapIterator_hasNext(lp_iter)){
+		service_factory_pt factory = (service_factory_pt) hashMapIterator_nextValue(lp_iter);
+		topic_publication_pt topic_pub = (topic_publication_pt) factory->handle;
+		pubsub_topicPublicationRemoveSerializer(topic_pub, admin->serializerSvc);
+	}
+	hashMapIterator_destroy(lp_iter);
+	celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+	/* Remove serializer from all topic_subscription_pt */
+	celixThreadMutex_lock(&admin->subscriptionsLock);
+	hash_map_iterator_pt subs_iter = hashMapIterator_create(admin->subscriptions);
+	while(hashMapIterator_hasNext(subs_iter)){
+		topic_subscription_pt topic_sub = (topic_subscription_pt) hashMapIterator_nextValue(subs_iter);
+		pubsub_topicSubscriptionRemoveSerializer(topic_sub, admin->serializerSvc);
+	}
+	hashMapIterator_destroy(subs_iter);
+	celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+	return status;
+}
+
 static celix_status_t pubsubAdmin_match(pubsub_admin_pt admin, pubsub_endpoint_pt psEP, double* score){
 	celix_status_t status = CELIX_SUCCESS;
 

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
index 0c79479..cff5005 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
@@ -47,8 +47,6 @@
 #include "publisher.h"
 #include "large_udp.h"
 
-#include "pubsub_serializer.h"
-
 #define EP_ADDRESS_LEN		32
 
 #define FIRST_SEND_DELAY	2
@@ -61,6 +59,7 @@ struct topic_publication {
 	hash_map_pt boundServices; //<bundle_pt,bound_service>
 	celix_thread_mutex_t tp_lock;
 	struct sockaddr_in destAddr;
+	pubsub_serializer_service_pt serializerSvc;
 };
 
 typedef struct publish_bundle_bound_service {
@@ -99,7 +98,7 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsig
 static void delay_first_send_for_late_joiners(void);
 
 
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP,char* bindIP, topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* bindIP, topic_publication_pt *out){
 
     char* ep = malloc(EP_ADDRESS_LEN);
     memset(ep,0,EP_ADDRESS_LEN);
@@ -118,6 +117,7 @@ celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt
 	pub->destAddr.sin_family = AF_INET;
 	pub->destAddr.sin_addr.s_addr = inet_addr(bindIP);
 	pub->destAddr.sin_port = htons(port);
+	pub->serializerSvc = serializer;
 
 	pubsub_topicPublicationAddPublisherEP(pub,pubEP);
 
@@ -223,6 +223,46 @@ celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub
 	return CELIX_SUCCESS;
 }
 
+celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc){
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&(pub->tp_lock));
+
+	pub->serializerSvc = serializerSvc;
+
+	hash_map_iterator_pt bs_iter = hashMapIterator_create(pub->boundServices);
+	while(hashMapIterator_hasNext(bs_iter)){
+		publish_bundle_bound_service_pt boundSvc = (publish_bundle_bound_service_pt) hashMapIterator_nextValue(bs_iter);
+		if (hashMap_size(boundSvc->msgTypes) == 0){
+			pub->serializerSvc->fillMsgTypesMap(pub->serializerSvc->serializer, boundSvc->msgTypes, boundSvc->bundle);
+		}
+	}
+	hashMapIterator_destroy(bs_iter);
+
+	celixThreadMutex_unlock(&(pub->tp_lock));
+
+	return status;
+}
+
+celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc){
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&(pub->tp_lock));
+
+	hash_map_iterator_pt bs_iter = hashMapIterator_create(pub->boundServices);
+	while(hashMapIterator_hasNext(bs_iter)){
+		publish_bundle_bound_service_pt boundSvc = (publish_bundle_bound_service_pt) hashMapIterator_nextValue(bs_iter);
+		pub->serializerSvc->emptyMsgTypesMap(pub->serializerSvc->serializer, boundSvc->msgTypes);
+	}
+	hashMapIterator_destroy(bs_iter);
+
+	pub->serializerSvc = NULL;
+
+	celixThreadMutex_unlock(&(pub->tp_lock));
+
+	return status;
+}
+
 array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){
 	return pub->pub_ep_list;
 }
@@ -328,9 +368,9 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, con
 
     int major=0, minor=0;
 
-    if (msgType != NULL) {
+    if (msgType != NULL && bound->parent->serializerSvc != NULL) {
 
-    	version_pt msgVersion = pubsubSerializer_getVersion(msgType);
+    	version_pt msgVersion = bound->parent->serializerSvc->getVersion(bound->parent->serializerSvc->serializer, msgType);
 
 		pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
 
@@ -346,7 +386,7 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, con
 
 		void* serializedOutput = NULL;
 		int serializedOutputLen = 0;
-		pubsubSerializer_serialize(msgType, msg, &serializedOutput, &serializedOutputLen);
+		bound->parent->serializerSvc->serialize(bound->parent->serializerSvc->serializer, msgType, msg, &serializedOutput, &serializedOutputLen);
 
 		pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg));
 		msg->header = msg_hdr;
@@ -372,7 +412,7 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, con
 }
 
 static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){
-	*msgTypeId = pubsubSerializer_hashCode(msgType);
+	*msgTypeId = utils_stringHash(msgType);
 	return 0;
 }
 
@@ -411,7 +451,9 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
 		bound->service->send = pubsub_topicPublicationSend;
 		bound->service->sendMultipart = NULL;  //Multipart not supported (jet) for UDP
 
-		pubsubSerializer_fillMsgTypesMap(bound->msgTypes,bound->bundle);
+		if (tp->serializerSvc != NULL){
+			tp->serializerSvc->fillMsgTypesMap(tp->serializerSvc->serializer, bound->msgTypes,bound->bundle);
+		}
 
 	}
 	else
@@ -435,7 +477,9 @@ static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service
 	}
 
 	if(boundSvc->msgTypes != NULL){
-		pubsubSerializer_emptyMsgTypesMap(boundSvc->msgTypes);
+		if (boundSvc->parent->serializerSvc != NULL){
+			boundSvc->parent->serializerSvc->emptyMsgTypesMap(boundSvc->parent->serializerSvc->serializer, boundSvc->msgTypes);
+		}
 		hashMap_destroy(boundSvc->msgTypes,false,false);
 	}
 

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
index 0caf084..91bce9f 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
@@ -53,8 +53,6 @@
 #include "pubsub_publish_service_private.h"
 #include "large_udp.h"
 
-#include "pubsub_serializer.h"
-
 #define MAX_EPOLL_EVENTS        10
 #define RECV_THREAD_TIMEOUT     5
 #define UDP_BUFFER_SIZE         65535
@@ -74,6 +72,7 @@ struct topic_subscription{
 	hash_map_pt socketMap; // key = URL, value = listen-socket
 	unsigned int nrSubscribers;
 	largeUdp_pt largeUdpHandle;
+	pubsub_serializer_service_pt serializerSvc;
 
 };
 
@@ -95,7 +94,7 @@ static void sigusr1_sighandler(int signo);
 static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
 
 
-celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, char* scope, char* topic,topic_subscription_pt* out){
+celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, pubsub_serializer_service_pt serializer, char* scope, char* topic,topic_subscription_pt* out){
 	celix_status_t status = CELIX_SUCCESS;
 
 	topic_subscription_pt ts = (topic_subscription_pt) calloc(1,sizeof(*ts));
@@ -112,6 +111,7 @@ celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundl
 
 	ts->running = false;
 	ts->nrSubscribers = 0;
+	ts->serializerSvc = serializer;
 
 	celixThreadMutex_create(&ts->ts_lock,NULL);
 	arrayList_create(&ts->sub_ep_list);
@@ -391,6 +391,39 @@ unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
 	return ts->nrSubscribers;
 }
 
+celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc){
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&ts->ts_lock);
+
+	ts->serializerSvc = serializerSvc;
+
+	celixThreadMutex_unlock(&ts->ts_lock);
+
+	return status;
+}
+
+celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc){
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&ts->ts_lock);
+
+	hash_map_iterator_pt svc_iter = hashMapIterator_create(ts->servicesMap);
+	while(hashMapIterator_hasNext(svc_iter)){
+		hash_map_pt msgTypes = (hash_map_pt) hashMapIterator_nextValue(svc_iter);
+		if (hashMap_size(msgTypes) > 0){
+			ts->serializerSvc->emptyMsgTypesMap(ts->serializerSvc->serializer, msgTypes);
+		}
+	}
+	hashMapIterator_destroy(svc_iter);
+
+	ts->serializerSvc = NULL;
+
+	celixThreadMutex_unlock(&ts->ts_lock);
+
+	return status;
+}
+
 static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service){
 	celix_status_t status = CELIX_SUCCESS;
 	topic_subscription_pt ts = handle;
@@ -401,7 +434,10 @@ static celix_status_t topicsub_subscriberTracked(void * handle, service_referenc
 
 		bundle_pt bundle = NULL;
 		serviceReference_getBundle(reference, &bundle);
-		pubsubSerializer_fillMsgTypesMap(msgTypes,bundle);
+
+		if (ts->serializerSvc != NULL){
+			ts->serializerSvc->fillMsgTypesMap(ts->serializerSvc->serializer, msgTypes,bundle);
+		}
 
 		if(hashMap_size(msgTypes)==0){ //If the msgTypes hashMap is not filled, the service is an unsupported subscriber
 			hashMap_destroy(msgTypes,false,false);
@@ -426,7 +462,9 @@ static celix_status_t topicsub_subscriberUntracked(void * handle, service_refere
 	if (hashMap_containsKey(ts->servicesMap, service)) {
 		hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service);
 		if(msgTypes!=NULL){
-			pubsubSerializer_emptyMsgTypesMap(msgTypes);
+			if (ts->serializerSvc != NULL){
+				ts->serializerSvc->emptyMsgTypesMap(ts->serializerSvc->serializer, msgTypes);
+			}
 			hashMap_destroy(msgTypes,false,false);
 		}
 	}
@@ -450,15 +488,18 @@ static void process_msg(topic_subscription_pt sub,pubsub_udp_msg_pt msg){
 		if (msgType == NULL) {
 			printf("TS: Primary message %d not supported. NOT receiving any part of the whole message.\n",msg->header.type);
 		}
+		else if (sub->serializerSvc == NULL){
+			printf("TS: No active serializer service found!\n");
+		}
 		else{
 			void *msgInst = NULL;
-			char *name = pubsubSerializer_getName(msgType);
-			version_pt msgVersion = pubsubSerializer_getVersion(msgType);
+			char *name = sub->serializerSvc->getName(sub->serializerSvc->serializer, msgType);
+			version_pt msgVersion = sub->serializerSvc->getVersion(sub->serializerSvc->serializer, msgType);
 
 			bool validVersion = checkVersion(msgVersion,&msg->header);
 
 			if(validVersion){
-				celix_status_t status = pubsubSerializer_deserialize(msgType, (const void *) msg->payload, &msgInst);
+				celix_status_t status = sub->serializerSvc->deserialize(sub->serializerSvc->serializer, msgType, (const void *) msg->payload, &msgInst);
 
 				if (status == CELIX_SUCCESS) {
 					bool release = true;
@@ -469,7 +510,7 @@ static void process_msg(topic_subscription_pt sub,pubsub_udp_msg_pt msg){
 
 					subsvc->receive(subsvc->handle, name, msg->header.type, msgInst, &mp_callbacks, &release);
 					if(release){
-						pubsubSerializer_freeMsg(msgType, msgInst);
+						sub->serializerSvc->freeMsg(sub->serializerSvc->serializer, msgType, msgInst);
 					}
 				}
 				else{
@@ -557,7 +598,7 @@ static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){
 }
 
 static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId){
-	*msgTypeId = pubsubSerializer_hashCode(msgType);
+	*msgTypeId = utils_stringHash(msgType);
 	return 0;
 }
 

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_admin_zmq/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/CMakeLists.txt b/pubsub/pubsub_admin_zmq/CMakeLists.txt
index 75e6632..956830d 100644
--- a/pubsub/pubsub_admin_zmq/CMakeLists.txt
+++ b/pubsub/pubsub_admin_zmq/CMakeLists.txt
@@ -31,15 +31,6 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 	include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub")
 	include_directories("private/include")
 	include_directories("public/include")
-	if (SERIALIZER_PATH)
-		include_directories("${SERIALIZER_PATH}/include")
-	endif()
-	if (SERIALIZER_LIB_INCLUDE_DIR)
-		include_directories("${SERIALIZER_LIB_INCLUDE_DIR}")
-	endif()
-	if (SERIALIZER_LIB_DIR)
-		link_directories("${SERIALIZER_LIB_DIR}")
-	endif()
 
 	if (BUILD_ZMQ_SECURITY)
 		add_definitions(-DBUILD_WITH_ZMQ_SECURITY=1)
@@ -51,7 +42,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 	endif()
 
 	add_bundle(org.apache.celix.pubsub_admin.PubSubAdminZmq
-	    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin"
+	    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_zmq"
 	    VERSION "1.0.0"
 	    SOURCES
 	    	private/src/psa_activator.c
@@ -63,11 +54,10 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 	    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
 	    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/dyn_msg_utils.c
 	    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
-	    	${PUBSUB_SERIALIZER_SRC}
 	)
 
 	set_target_properties(org.apache.celix.pubsub_admin.PubSubAdminZmq PROPERTIES INSTALL_RPATH "$ORIGIN")
-	target_link_libraries(org.apache.celix.pubsub_admin.PubSubAdminZmq celix_framework celix_utils celix_dfi ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} ${JANSSON_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY} ${SERIALIZER_LIBRARY})
+	target_link_libraries(org.apache.celix.pubsub_admin.PubSubAdminZmq celix_framework celix_utils celix_dfi ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} ${JANSSON_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY})
 	install_bundle(org.apache.celix.pubsub_admin.PubSubAdminZmq)
 
 endif()

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
index efe76c3..2f81bff 100644
--- a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
+++ b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
@@ -38,6 +38,7 @@
 #undef LOG_WARNING
 
 #include "pubsub_admin.h"
+#include "pubsub_serializer.h"
 #include "log_helper.h"
 
 #define PSA_ZMQ_BASE_PORT "PSA_ZMQ_BASE_PORT"
@@ -48,6 +49,8 @@
 
 struct pubsub_admin {
 
+	pubsub_serializer_service_pt serializerSvc;
+
 	bundle_context_pt bundle_context;
 	log_helper_pt loghelper;
 
@@ -86,4 +89,7 @@ celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* sco
 celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score);
 celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score);
 
+celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
+
 #endif /* PUBSUB_ADMIN_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h b/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
index 1c12eb8..b6b76c6 100644
--- a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
+++ b/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
@@ -30,15 +30,19 @@
 #include "publisher.h"
 #include "pubsub_endpoint.h"
 #include "pubsub_common.h"
+#include "pubsub_serializer.h"
 
 typedef struct topic_publication *topic_publication_pt;
 
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP,char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out);
 celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
 
 celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
 celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
 
+celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc);
+
 celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
 celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);
 

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
index d6bf8fb..c6fe93a 100644
--- a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
+++ b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
@@ -34,10 +34,11 @@
 
 #include "pubsub_endpoint.h"
 #include "pubsub_common.h"
+#include "pubsub_serializer.h"
 
 typedef struct topic_subscription* topic_subscription_pt;
 
-celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, char* scope, char* topic,topic_subscription_pt* out);
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_pt serializer, char* scope, char* topic,topic_subscription_pt* out);
 celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
 celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
 celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);
@@ -50,6 +51,9 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
 celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
 celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
 
+celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc);
+
 celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription);
 celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt subscription);
 unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt subscription);

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_admin_zmq/private/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/psa_activator.c b/pubsub/pubsub_admin_zmq/private/src/psa_activator.c
index 20f6070..cfe2c2e 100644
--- a/pubsub/pubsub_admin_zmq/private/src/psa_activator.c
+++ b/pubsub/pubsub_admin_zmq/private/src/psa_activator.c
@@ -77,6 +77,9 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt context)
 		pubsubAdminSvc->matchPublisher = pubsubAdmin_matchPublisher;
 		pubsubAdminSvc->matchSubscriber = pubsubAdmin_matchSubscriber;
 
+		pubsubAdminSvc->setSerializer = pubsubAdmin_setSerializer;
+		pubsubAdminSvc->removeSerializer = pubsubAdmin_removeSerializer;
+
 		activator->adminService = pubsubAdminSvc;
 
 		status = bundleContext_registerService(context, PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration);

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
index e670899..09fcd8c 100644
--- a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
@@ -255,7 +255,7 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu
 
 		int i;
 
-		status += pubsub_topicSubscriptionCreate(admin->bundle_context, subEP, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, &any_sub);
+		status += pubsub_topicSubscriptionCreate(admin->bundle_context, subEP, admin->serializerSvc, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, &any_sub);
 
 		if (status == CELIX_SUCCESS){
 
@@ -341,7 +341,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 		topic_subscription_pt subscription = hashMap_get(admin->subscriptions, scope_topic);
 
 		if(subscription == NULL) {
-			status += pubsub_topicSubscriptionCreate(admin->bundle_context,subEP,subEP->scope, subEP->topic,&subscription);
+			status += pubsub_topicSubscriptionCreate(admin->bundle_context, subEP, admin->serializerSvc, subEP->scope, subEP->topic, &subscription);
 
 			if (status==CELIX_SUCCESS){
 
@@ -444,7 +444,7 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint
 
         if (factory == NULL) {
             topic_publication_pt pub = NULL;
-            status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, admin->ipAddress, admin->basePort, admin->maxPort, &pub);
+            status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, admin->serializerSvc, admin->ipAddress, admin->basePort, admin->maxPort, &pub);
             if (status == CELIX_SUCCESS) {
                 status = pubsub_topicPublicationStart(admin->bundle_context, pub, &factory);
                 if (status == CELIX_SUCCESS && factory != NULL) {
@@ -666,6 +666,62 @@ celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoin
 	return status;
 }
 
+celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc){
+	celix_status_t status = CELIX_SUCCESS;
+	admin->serializerSvc = serializerSvc;
+
+	/* Add serializer to all topic_publication_pt */
+	celixThreadMutex_lock(&admin->localPublicationsLock);
+	hash_map_iterator_pt lp_iter = hashMapIterator_create(admin->localPublications);
+	while(hashMapIterator_hasNext(lp_iter)){
+		service_factory_pt factory = (service_factory_pt) hashMapIterator_nextValue(lp_iter);
+		topic_publication_pt topic_pub = (topic_publication_pt) factory->handle;
+		pubsub_topicPublicationAddSerializer(topic_pub, admin->serializerSvc);
+	}
+	hashMapIterator_destroy(lp_iter);
+	celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+	/* Add serializer to all topic_subscription_pt */
+	celixThreadMutex_lock(&admin->subscriptionsLock);
+	hash_map_iterator_pt subs_iter = hashMapIterator_create(admin->subscriptions);
+	while(hashMapIterator_hasNext(subs_iter)){
+		topic_subscription_pt topic_sub = (topic_subscription_pt) hashMapIterator_nextValue(subs_iter);
+		pubsub_topicSubscriptionAddSerializer(topic_sub, admin->serializerSvc);
+	}
+	hashMapIterator_destroy(subs_iter);
+	celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+	return status;
+}
+
+celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc){
+	celix_status_t status = CELIX_SUCCESS;
+	admin->serializerSvc = NULL;
+
+	/* Remove serializer from all topic_publication_pt */
+	celixThreadMutex_lock(&admin->localPublicationsLock);
+	hash_map_iterator_pt lp_iter = hashMapIterator_create(admin->localPublications);
+	while(hashMapIterator_hasNext(lp_iter)){
+		service_factory_pt factory = (service_factory_pt) hashMapIterator_nextValue(lp_iter);
+		topic_publication_pt topic_pub = (topic_publication_pt) factory->handle;
+		pubsub_topicPublicationRemoveSerializer(topic_pub, admin->serializerSvc);
+	}
+	hashMapIterator_destroy(lp_iter);
+	celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+	/* Remove serializer from all topic_subscription_pt */
+	celixThreadMutex_lock(&admin->subscriptionsLock);
+	hash_map_iterator_pt subs_iter = hashMapIterator_create(admin->subscriptions);
+	while(hashMapIterator_hasNext(subs_iter)){
+		topic_subscription_pt topic_sub = (topic_subscription_pt) hashMapIterator_nextValue(subs_iter);
+		pubsub_topicSubscriptionRemoveSerializer(topic_sub, admin->serializerSvc);
+	}
+	hashMapIterator_destroy(subs_iter);
+	celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+	return status;
+}
+
 static celix_status_t pubsubAdmin_match(pubsub_admin_pt admin, pubsub_endpoint_pt psEP, double* score){
 	celix_status_t status = CELIX_SUCCESS;
 

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
index e445753..bb8ff56 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
@@ -51,8 +51,6 @@
 #include "pubsub_utils.h"
 #include "publisher.h"
 
-#include "pubsub_serializer.h"
-
 #ifdef BUILD_WITH_ZMQ_SECURITY
 	#include "zmq_crypto.h"
 
@@ -72,6 +70,7 @@ struct topic_publication {
 	array_list_pt pub_ep_list; //List<pubsub_endpoint>
 	hash_map_pt boundServices; //<bundle_pt,bound_service>
 	celix_thread_mutex_t tp_lock;
+	pubsub_serializer_service_pt serializerSvc;
 };
 
 typedef struct publish_bundle_bound_service {
@@ -106,7 +105,7 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsig
 
 static void delay_first_send_for_late_joiners(void);
 
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP,char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
 	celix_status_t status = CELIX_SUCCESS;
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
@@ -211,6 +210,7 @@ celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, p
 
 	pub->endpoint = ep;
 	pub->zmq_socket = socket;
+	pub->serializerSvc = serializer;
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
 	if (pubEP->is_secure){
@@ -332,6 +332,46 @@ celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub
 	return CELIX_SUCCESS;
 }
 
+celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc){
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&(pub->tp_lock));
+
+	pub->serializerSvc = serializerSvc;
+
+	hash_map_iterator_pt bs_iter = hashMapIterator_create(pub->boundServices);
+	while(hashMapIterator_hasNext(bs_iter)){
+		publish_bundle_bound_service_pt boundSvc = (publish_bundle_bound_service_pt) hashMapIterator_nextValue(bs_iter);
+		if (hashMap_size(boundSvc->msgTypes) == 0){
+			pub->serializerSvc->fillMsgTypesMap(pub->serializerSvc->serializer, boundSvc->msgTypes, boundSvc->bundle);
+		}
+	}
+	hashMapIterator_destroy(bs_iter);
+
+	celixThreadMutex_unlock(&(pub->tp_lock));
+
+	return status;
+}
+
+celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_pt serializerSvc){
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&(pub->tp_lock));
+
+	hash_map_iterator_pt bs_iter = hashMapIterator_create(pub->boundServices);
+	while(hashMapIterator_hasNext(bs_iter)){
+		publish_bundle_bound_service_pt boundSvc = (publish_bundle_bound_service_pt) hashMapIterator_nextValue(bs_iter);
+		pub->serializerSvc->emptyMsgTypesMap(pub->serializerSvc->serializer, boundSvc->msgTypes);
+	}
+	hashMapIterator_destroy(bs_iter);
+
+	pub->serializerSvc = NULL;
+
+	celixThreadMutex_unlock(&(pub->tp_lock));
+
+	return status;
+}
+
 array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){
 	return pub->pub_ep_list;
 }
@@ -465,9 +505,9 @@ static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTy
 
 	int major=0, minor=0;
 
-	if (msgType != NULL) {
+	if (msgType != NULL && bound->parent->serializerSvc != NULL) {
 
-		version_pt msgVersion = pubsubSerializer_getVersion(msgType);
+		version_pt msgVersion = bound->parent->serializerSvc->getVersion(bound->parent->serializerSvc->serializer, msgType);
 
 		pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
 
@@ -483,7 +523,7 @@ static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTy
 
 		void* serializedOutput = NULL;
 		int serializedOutputLen = 0;
-		pubsubSerializer_serialize(msgType, msg, &serializedOutput, &serializedOutputLen);
+		bound->parent->serializerSvc->serialize(bound->parent->serializerSvc->serializer, msgType, msg, &serializedOutput, &serializedOutputLen);
 
 		pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg));
 		msg->header = msg_hdr;
@@ -551,7 +591,7 @@ static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTy
 }
 
 static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){
-	*msgTypeId = pubsubSerializer_hashCode(msgType);
+	*msgTypeId = utils_stringHash(msgType);
 	return 0;
 }
 
@@ -588,7 +628,9 @@ static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(to
 		bound->service->send = pubsub_topicPublicationSend;
 		bound->service->sendMultipart = pubsub_topicPublicationSendMultipart;
 
-		pubsubSerializer_fillMsgTypesMap(bound->msgTypes,bound->bundle);
+		if (tp->serializerSvc != NULL){
+			tp->serializerSvc->fillMsgTypesMap(tp->serializerSvc->serializer, bound->msgTypes,bound->bundle);
+		}
 
 	}
 	else
@@ -612,7 +654,9 @@ static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service
 	}
 
 	if(boundSvc->msgTypes != NULL){
-		pubsubSerializer_emptyMsgTypesMap(boundSvc->msgTypes);
+		if (boundSvc->parent->serializerSvc != NULL){
+			boundSvc->parent->serializerSvc->emptyMsgTypesMap(boundSvc->parent->serializerSvc->serializer, boundSvc->msgTypes);
+		}
 		hashMap_destroy(boundSvc->msgTypes,false,false);
 	}
 

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
index cb9aff5..3de56af 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
@@ -49,8 +49,6 @@
 #include "dyn_msg_utils.h"
 #include "pubsub_utils.h"
 
-#include "pubsub_serializer.h"
-
 #ifdef BUILD_WITH_ZMQ_SECURITY
 	#include "zmq_crypto.h"
 
@@ -80,6 +78,8 @@ struct topic_subscription{
 	celix_thread_mutex_t pendingConnections_lock;
 	celix_thread_mutex_t pendingDisconnections_lock;
 	unsigned int nrSubscribers;
+	pubsub_serializer_service_pt serializerSvc;
+
 };
 
 typedef struct complete_zmq_msg{
@@ -104,12 +104,12 @@ static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr);
 static void sigusr1_sighandler(int signo);
 static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
 static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain, void **part);
-static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt rcv_msg_list);
+static mp_handle_pt create_mp_handle(topic_subscription_pt sub, hash_map_pt svc_msg_db,array_list_pt rcv_msg_list);
 static void destroy_mp_handle(mp_handle_pt mp_handle);
 static void connectPendingPublishers(topic_subscription_pt sub);
 static void disconnectPendingPublishers(topic_subscription_pt sub);
 
-celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, char* scope, char* topic,topic_subscription_pt* out){
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_pt serializer, char* scope, char* topic,topic_subscription_pt* out){
 	celix_status_t status = CELIX_SUCCESS;
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
@@ -211,6 +211,7 @@ celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,
 	ts->zmq_socket = zmq_s;
 	ts->running = false;
 	ts->nrSubscribers = 0;
+	ts->serializerSvc = serializer;
 
 	#ifdef BUILD_WITH_ZMQ_SECURITY
 	if (subEP->is_secure){
@@ -428,6 +429,39 @@ unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
 	return ts->nrSubscribers;
 }
 
+celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc){
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&ts->ts_lock);
+
+	ts->serializerSvc = serializerSvc;
+
+	celixThreadMutex_unlock(&ts->ts_lock);
+
+	return status;
+}
+
+celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_pt serializerSvc){
+	celix_status_t status = CELIX_SUCCESS;
+
+	celixThreadMutex_lock(&ts->ts_lock);
+
+	hash_map_iterator_pt svc_iter = hashMapIterator_create(ts->servicesMap);
+	while(hashMapIterator_hasNext(svc_iter)){
+		hash_map_pt msgTypes = (hash_map_pt) hashMapIterator_nextValue(svc_iter);
+		if (hashMap_size(msgTypes) > 0){
+			ts->serializerSvc->emptyMsgTypesMap(ts->serializerSvc->serializer, msgTypes);
+		}
+	}
+	hashMapIterator_destroy(svc_iter);
+
+	ts->serializerSvc = NULL;
+
+	celixThreadMutex_unlock(&ts->ts_lock);
+
+	return status;
+}
+
 static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service){
 	celix_status_t status = CELIX_SUCCESS;
 	topic_subscription_pt ts = handle;
@@ -438,7 +472,10 @@ static celix_status_t topicsub_subscriberTracked(void * handle, service_referenc
 
 		bundle_pt bundle = NULL;
 		serviceReference_getBundle(reference, &bundle);
-		pubsubSerializer_fillMsgTypesMap(msgTypes,bundle);
+
+		if (ts->serializerSvc != NULL){
+			ts->serializerSvc->fillMsgTypesMap(ts->serializerSvc->serializer, msgTypes,bundle);
+		}
 
 		if(hashMap_size(msgTypes)==0){ //If the msgTypes hashMap is not filled, the service is an unsupported subscriber
 			hashMap_destroy(msgTypes,false,false);
@@ -463,7 +500,9 @@ static celix_status_t topicsub_subscriberUntracked(void * handle, service_refere
 	if (hashMap_containsKey(ts->servicesMap, service)) {
 		hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service);
 		if(msgTypes!=NULL){
-			pubsubSerializer_emptyMsgTypesMap(msgTypes);
+			if (ts->serializerSvc != NULL){
+				ts->serializerSvc->emptyMsgTypesMap(ts->serializerSvc->serializer, msgTypes);
+			}
 			hashMap_destroy(msgTypes,false,false);
 		}
 	}
@@ -488,21 +527,24 @@ static void process_msg(topic_subscription_pt sub,array_list_pt msg_list){
 		if (msgType == NULL) {
 			printf("TS: Primary message %d not supported. NOT sending any part of the whole message.\n",first_msg_hdr->type);
 		}
+		else if (sub->serializerSvc == NULL){
+			printf("TS: No active serializer found!\n");
+		}
 		else{
 			void *msgInst = NULL;
-			char *name = pubsubSerializer_getName(msgType);
-			version_pt msgVersion = pubsubSerializer_getVersion(msgType);
+			char *name = sub->serializerSvc->getName(sub->serializerSvc->serializer, msgType);
+			version_pt msgVersion = sub->serializerSvc->getVersion(sub->serializerSvc->serializer, msgType);
 
 			bool validVersion = checkVersion(msgVersion,first_msg_hdr);
 
 			if(validVersion){
 
-				celix_status_t status = pubsubSerializer_deserialize(msgType, (const void *) zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), &msgInst);
+				celix_status_t status = sub->serializerSvc->deserialize(sub->serializerSvc->serializer, msgType, (const void *) zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), &msgInst);
 
 				if (status == CELIX_SUCCESS) {
 					bool release = true;
 
-					mp_handle_pt mp_handle = create_mp_handle(msgTypes,msg_list);
+					mp_handle_pt mp_handle = create_mp_handle(sub, msgTypes,msg_list);
 					pubsub_multipart_callbacks_t mp_callbacks;
 					mp_callbacks.handle = mp_handle;
 					mp_callbacks.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForMsgType;
@@ -510,7 +552,7 @@ static void process_msg(topic_subscription_pt sub,array_list_pt msg_list){
 					subsvc->receive(subsvc->handle, name, first_msg_hdr->type, msgInst, &mp_callbacks, &release);
 
 					if(release){
-						pubsubSerializer_freeMsg(msgType, msgInst);
+						sub->serializerSvc->freeMsg(sub->serializerSvc->serializer, msgType, msgInst);
 					}
 					if(mp_handle!=NULL){
 						destroy_mp_handle(mp_handle);
@@ -684,7 +726,7 @@ static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){
 }
 
 static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId){
-	*msgTypeId = pubsubSerializer_hashCode(msgType);
+	*msgTypeId = utils_stringHash(msgType);
 	return 0;
 }
 
@@ -711,7 +753,7 @@ static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain
 
 }
 
-static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt rcv_msg_list){
+static mp_handle_pt create_mp_handle(topic_subscription_pt sub, hash_map_pt svc_msg_db,array_list_pt rcv_msg_list){
 
 	if(arrayList_size(rcv_msg_list)==1){ //Means it's not a multipart message
 		return NULL;
@@ -728,14 +770,14 @@ static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt rcv_ms
 		pubsub_msg_header_pt header = (pubsub_msg_header_pt)zframe_data(c_msg->header);
 
 		pubsub_message_type *msgType = hashMap_get(svc_msg_db,&(header->type));
-		if (msgType != NULL) {
+		if (msgType != NULL && sub->serializerSvc != NULL) {
 			void *msgInst = NULL;
-			version_pt msgVersion = pubsubSerializer_getVersion(msgType);
+			version_pt msgVersion = sub->serializerSvc->getVersion(sub->serializerSvc->serializer, msgType);
 
 			bool validVersion = checkVersion(msgVersion,header);
 
 			if(validVersion){
-				celix_status_t status = pubsubSerializer_deserialize(msgType, (const void *) zframe_data(c_msg->payload), &msgInst);
+				celix_status_t status = sub->serializerSvc->deserialize(sub->serializerSvc->serializer, msgType, (const void *) zframe_data(c_msg->payload), &msgInst);
 
 				if(status == CELIX_SUCCESS){
 					unsigned int* msgId = calloc(1,sizeof(unsigned int));

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_common/public/include/pubsub_admin.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_admin.h b/pubsub/pubsub_common/public/include/pubsub_admin.h
index fc1cfbb..52cb75c 100644
--- a/pubsub/pubsub_common/public/include/pubsub_admin.h
+++ b/pubsub/pubsub_common/public/include/pubsub_admin.h
@@ -31,6 +31,7 @@
 
 #include "pubsub_common.h"
 #include "pubsub_endpoint.h"
+#include "pubsub_serializer.h"
 
 #define PSA_IP 	"PSA_IP"
 #define PSA_ITF	"PSA_INTERFACE"
@@ -54,6 +55,10 @@ struct pubsub_admin_service {
 
 	celix_status_t (*matchPublisher)(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score);
 	celix_status_t (*matchSubscriber)(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score);
+
+	celix_status_t (*setSerializer)(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
+	celix_status_t (*removeSerializer)(pubsub_admin_pt admin, pubsub_serializer_service_pt serializerSvc);
+
 };
 
 typedef struct pubsub_admin_service *pubsub_admin_service_pt;

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_common/public/include/pubsub_common.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_common.h b/pubsub/pubsub_common/public/include/pubsub_common.h
index d9c6f1d..46abd72 100644
--- a/pubsub/pubsub_common/public/include/pubsub_common.h
+++ b/pubsub/pubsub_common/public/include/pubsub_common.h
@@ -27,6 +27,7 @@
 #ifndef PUBSUB_COMMON_H_
 #define PUBSUB_COMMON_H_
 
+#define PUBSUB_SERIALIZER_SERVICE 		"pubsub_serializer"
 #define PUBSUB_ADMIN_SERVICE 			"pubsub_admin"
 #define PUBSUB_DISCOVERY_SERVICE		"pubsub_discovery"
 #define PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE    "pubsub_tm_announce_publisher"

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_common/public/include/pubsub_serializer.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_serializer.h b/pubsub/pubsub_common/public/include/pubsub_serializer.h
index 565bac4..f2df075 100644
--- a/pubsub/pubsub_common/public/include/pubsub_serializer.h
+++ b/pubsub/pubsub_common/public/include/pubsub_serializer.h
@@ -19,30 +19,38 @@
 /*
  * pubsub_serializer.h
  *
- *  \date       Dec 7, 2016
+ *  \date       Mar 24, 2017
  *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
  *  \copyright	Apache License, Version 2.0
  */
 
-#ifndef PUBSUB_SERIALIZER_H
-#define PUBSUB_SERIALIZER_H
+#ifndef PUBSUB_SERIALIZER_H_
+#define PUBSUB_SERIALIZER_H_
 
-#include "bundle.h"
-#include "hash_map.h"
-#include "celix_errno.h"
+#include "service_reference.h"
+
+#include "pubsub_common.h"
 
 typedef struct _pubsub_message_type pubsub_message_type;
 
-celix_status_t pubsubSerializer_serialize(pubsub_message_type *msgType, const void *input, void **output, int *outputLen);
-celix_status_t pubsubSerializer_deserialize(pubsub_message_type *msgType, const void *input, void **output);
+typedef struct pubsub_serializer *pubsub_serializer_pt;
+
+struct pubsub_serializer_service {
+
+	pubsub_serializer_pt serializer;
+
+	celix_status_t (*serialize)(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output, int *outputLen);
+	celix_status_t (*deserialize)(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output);
+
+	void (*fillMsgTypesMap)(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap,bundle_pt bundle);
+	void (*emptyMsgTypesMap)(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap);
 
-unsigned int pubsubSerializer_hashCode(const char *string);
-version_pt pubsubSerializer_getVersion(pubsub_message_type *msgType);
-char* pubsubSerializer_getName(pubsub_message_type *msgType);
+	version_pt (*getVersion)(pubsub_serializer_pt serializer, pubsub_message_type *msgType);
+	char* (*getName)(pubsub_serializer_pt serializer, pubsub_message_type *msgType);
+	void (*freeMsg)(pubsub_serializer_pt serializer, pubsub_message_type *msgType, void *msg);
 
-void pubsubSerializer_fillMsgTypesMap(hash_map_pt msgTypesMap,bundle_pt bundle);
-void pubsubSerializer_emptyMsgTypesMap(hash_map_pt msgTypesMap);
+};
 
-void pubsubSerializer_freeMsg(pubsub_message_type *msgType, void *msg);
+typedef struct pubsub_serializer_service *pubsub_serializer_service_pt;
 
-#endif
+#endif /* PUBSUB_SERIALIZER_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_common/public/src/pubsub_serializer.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/src/pubsub_serializer.c b/pubsub/pubsub_common/public/src/pubsub_serializer.c
deleted file mode 100644
index bb6096a..0000000
--- a/pubsub/pubsub_common/public/src/pubsub_serializer.c
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_serializer_json.c
- *
- *  \date       Dec 7, 2016
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#include "pubsub_serializer.h"
-
-#include "utils.h"
-#include "json_serializer.h"
-#include "dyn_msg_utils.h"
-#include "dyn_type.h"
-#include "string.h"
-#include "dyn_message.h"
-#include "dyn_common.h"
-
-struct _pubsub_message_type {	/* _dyn_message_type */
-	struct namvals_head header;
-	struct namvals_head annotations;
-	struct types_head types;
-	dyn_type *msgType;
-	version_pt msgVersion;
-};
-
-celix_status_t pubsubSerializer_serialize(pubsub_message_type *msgType, const void *input, void **output, int *outputLen){
-	celix_status_t status = CELIX_SUCCESS;
-
-	dyn_type *type = NULL;
-	dynMessage_getMessageType((dyn_message_type *) msgType, &type);
-
-	char *jsonOutput = NULL;
-	int rc = jsonSerializer_serialize(type, (void *) input, &jsonOutput);
-	if (rc != 0){
-		status = CELIX_BUNDLE_EXCEPTION;
-	}
-
-	*output = (void *) jsonOutput;
-	*outputLen = strlen(jsonOutput) + 1;
-
-	return status;
-}
-
-celix_status_t pubsubSerializer_deserialize(pubsub_message_type *msgType, const void *input, void **output){
-	celix_status_t status = CELIX_SUCCESS;
-
-	dyn_type *type = NULL;
-	dynMessage_getMessageType((dyn_message_type *) msgType, &type);
-
-	void *textOutput = NULL;
-	int rc = jsonSerializer_deserialize(type, (const char *) input, &textOutput);
-	if (rc != 0){
-		status = CELIX_BUNDLE_EXCEPTION;
-	}
-
-	*output = textOutput;
-
-	return status;
-}
-
-unsigned int pubsubSerializer_hashCode(const char *string){
-	return utils_stringHash(string);
-}
-
-version_pt pubsubSerializer_getVersion(pubsub_message_type *msgType){
-	version_pt msgVersion = NULL;
-	dynMessage_getVersion((dyn_message_type *) msgType, &msgVersion);
-	return msgVersion;
-}
-
-char* pubsubSerializer_getName(pubsub_message_type *msgType){
-	char *name = NULL;
-	dynMessage_getName((dyn_message_type *) msgType, &name);
-	return name;
-}
-
-void pubsubSerializer_fillMsgTypesMap(hash_map_pt msgTypesMap,bundle_pt bundle){
-	fillMsgTypesMap(msgTypesMap, bundle);
-}
-
-void pubsubSerializer_emptyMsgTypesMap(hash_map_pt msgTypesMap){
-	emptyMsgTypesMap(msgTypesMap);
-}
-
-void pubsubSerializer_freeMsg(pubsub_message_type *msgType, void *msg){
-	dyn_type *type = NULL;
-	dynMessage_getMessageType((dyn_message_type *) msgType, &type);
-	dynType_free(type, msg);
-}
-

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_serializer_json/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/CMakeLists.txt b/pubsub/pubsub_serializer_json/CMakeLists.txt
new file mode 100644
index 0000000..61b1cd9
--- /dev/null
+++ b/pubsub/pubsub_serializer_json/CMakeLists.txt
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+find_package(Jansson REQUIRED)
+
+include_directories("private/include")
+include_directories("public/include")
+include_directories("${PROJECT_SOURCE_DIR}/utils/public/include")
+include_directories("${PROJECT_SOURCE_DIR}/log_service/public/include")
+include_directories("${PROJECT_SOURCE_DIR}/dfi/public/include")
+include_directories("${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include")
+include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub")
+include_directories("${JANSSON_INCLUDE_DIR}")
+
+add_bundle(org.apache.celix.pubsub_serializer.PubSubSerializerJson
+    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_serializer_json"
+    VERSION "1.0.0"
+    SOURCES
+    	private/src/ps_activator.c
+    	private/src/pubsub_serializer_impl.c
+    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/dyn_msg_utils.c
+    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
+    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
+)
+
+set_target_properties(org.apache.celix.pubsub_serializer.PubSubSerializerJson PROPERTIES INSTALL_RPATH "$ORIGIN")
+target_link_libraries(org.apache.celix.pubsub_serializer.PubSubSerializerJson celix_framework celix_utils celix_dfi ${JANSSON_LIBRARIES})
+
+install_bundle(org.apache.celix.pubsub_serializer.PubSubSerializerJson)
+

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h b/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
new file mode 100644
index 0000000..f026300
--- /dev/null
+++ b/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
@@ -0,0 +1,65 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_serializer_impl.h
+ *
+ *  \date       Mar 24, 2017
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_SERIALIZER_IMPL_H_
+#define PUBSUB_SERIALIZER_IMPL_H_
+
+#include "dyn_common.h"
+#include "dyn_type.h"
+#include "dyn_message.h"
+#include "log_helper.h"
+
+#include "pubsub_serializer.h"
+
+struct _pubsub_message_type {	/* _dyn_message_type */
+	struct namvals_head header;
+	struct namvals_head annotations;
+	struct types_head types;
+	dyn_type *msgType;
+	version_pt msgVersion;
+};
+
+struct pubsub_serializer {
+	bundle_context_pt bundle_context;
+	log_helper_pt loghelper;
+};
+
+celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_pt *serializer);
+celix_status_t pubsubSerializer_destroy(pubsub_serializer_pt serializer);
+
+/* Start of serializer specific functions */
+celix_status_t pubsubSerializer_serialize(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output, int *outputLen);
+celix_status_t pubsubSerializer_deserialize(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output);
+
+void pubsubSerializer_fillMsgTypesMap(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap,bundle_pt bundle);
+void pubsubSerializer_emptyMsgTypesMap(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap);
+
+version_pt pubsubSerializer_getVersion(pubsub_serializer_pt serializer, pubsub_message_type *msgType);
+char* pubsubSerializer_getName(pubsub_serializer_pt serializer, pubsub_message_type *msgType);
+void pubsubSerializer_freeMsg(pubsub_serializer_pt serializer, pubsub_message_type *msgType, void *msg);
+
+
+#endif /* PUBSUB_SERIALIZER_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_serializer_json/private/src/ps_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/private/src/ps_activator.c b/pubsub/pubsub_serializer_json/private/src/ps_activator.c
new file mode 100644
index 0000000..b83c26b
--- /dev/null
+++ b/pubsub/pubsub_serializer_json/private/src/ps_activator.c
@@ -0,0 +1,111 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * ps_activator.c
+ *
+ *  \date       Mar 24, 2017
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include <stdlib.h>
+
+#include "bundle_activator.h"
+#include "service_registration.h"
+
+#include "pubsub_serializer_impl.h"
+
+struct activator {
+	pubsub_serializer_pt serializer;
+	pubsub_serializer_service_pt serializerService;
+	service_registration_pt registration;
+};
+
+celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator;
+
+	activator = calloc(1, sizeof(*activator));
+	if (!activator) {
+		status = CELIX_ENOMEM;
+	}
+	else{
+		*userData = activator;
+		status = pubsubSerializer_create(context, &(activator->serializer));
+	}
+
+	return status;
+}
+
+celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = userData;
+	pubsub_serializer_service_pt pubsubSerializerSvc = calloc(1, sizeof(*pubsubSerializerSvc));
+
+	if (!pubsubSerializerSvc) {
+		status = CELIX_ENOMEM;
+	}
+	else{
+		pubsubSerializerSvc->serializer = activator->serializer;
+
+		pubsubSerializerSvc->serialize = pubsubSerializer_serialize;
+		pubsubSerializerSvc->deserialize = pubsubSerializer_deserialize;
+
+		pubsubSerializerSvc->fillMsgTypesMap = pubsubSerializer_fillMsgTypesMap;
+		pubsubSerializerSvc->emptyMsgTypesMap = pubsubSerializer_emptyMsgTypesMap;
+
+		pubsubSerializerSvc->getVersion = pubsubSerializer_getVersion;
+		pubsubSerializerSvc->getName = pubsubSerializer_getName;
+		pubsubSerializerSvc->freeMsg = pubsubSerializer_freeMsg;
+
+		activator->serializerService = pubsubSerializerSvc;
+
+		status = bundleContext_registerService(context, PUBSUB_SERIALIZER_SERVICE, pubsubSerializerSvc, NULL, &activator->registration);
+
+	}
+
+	return status;
+}
+
+celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = userData;
+
+	serviceRegistration_unregister(activator->registration);
+	activator->registration = NULL;
+
+	free(activator->serializerService);
+	activator->serializerService = NULL;
+
+	return status;
+}
+
+celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
+	celix_status_t status = CELIX_SUCCESS;
+	struct activator *activator = userData;
+
+	pubsubSerializer_destroy(activator->serializer);
+	activator->serializer = NULL;
+
+	free(activator);
+
+	return status;
+}
+
+

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c b/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
new file mode 100644
index 0000000..60d5c98
--- /dev/null
+++ b/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
@@ -0,0 +1,143 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_serializer_impl.c
+ *
+ *  \date       Mar 24, 2017
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+
+#include "constants.h"
+#include "utils.h"
+#include "hash_map.h"
+#include "array_list.h"
+#include "bundle_context.h"
+#include "bundle.h"
+#include "service_reference.h"
+#include "service_registration.h"
+#include "log_helper.h"
+#include "log_service.h"
+#include "service_factory.h"
+
+#include "json_serializer.h"
+#include "dyn_msg_utils.h"
+
+#include "pubsub_serializer_impl.h"
+
+celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_pt *serializer) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	*serializer = calloc(1, sizeof(**serializer));
+
+	if (!*serializer) {
+		status = CELIX_ENOMEM;
+	}
+	else{
+
+		(*serializer)->bundle_context= context;
+
+		if (logHelper_create(context, &(*serializer)->loghelper) == CELIX_SUCCESS) {
+			logHelper_start((*serializer)->loghelper);
+		}
+
+	}
+
+	return status;
+}
+
+celix_status_t pubsubSerializer_destroy(pubsub_serializer_pt serializer) {
+	celix_status_t status = CELIX_SUCCESS;
+
+	logHelper_stop(serializer->loghelper);
+	logHelper_destroy(&serializer->loghelper);
+
+	free(serializer);
+
+	return status;
+}
+
+celix_status_t pubsubSerializer_serialize(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output, int *outputLen){
+	celix_status_t status = CELIX_SUCCESS;
+
+	dyn_type *type = NULL;
+	dynMessage_getMessageType((dyn_message_type *) msgType, &type);
+
+	char *jsonOutput = NULL;
+	int rc = jsonSerializer_serialize(type, (void *) input, &jsonOutput);
+	if (rc != 0){
+		status = CELIX_BUNDLE_EXCEPTION;
+	}
+
+	*output = (void *) jsonOutput;
+	*outputLen = strlen(jsonOutput) + 1;
+
+	return status;
+}
+
+celix_status_t pubsubSerializer_deserialize(pubsub_serializer_pt serializer, pubsub_message_type *msgType, const void *input, void **output){
+	celix_status_t status = CELIX_SUCCESS;
+
+	dyn_type *type = NULL;
+	dynMessage_getMessageType((dyn_message_type *) msgType, &type);
+
+	void *textOutput = NULL;
+	int rc = jsonSerializer_deserialize(type, (const char *) input, &textOutput);
+	if (rc != 0){
+		status = CELIX_BUNDLE_EXCEPTION;
+	}
+
+	*output = textOutput;
+
+	return status;
+}
+
+void pubsubSerializer_fillMsgTypesMap(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap, bundle_pt bundle){
+	fillMsgTypesMap(msgTypesMap, bundle);
+}
+
+void pubsubSerializer_emptyMsgTypesMap(pubsub_serializer_pt serializer, hash_map_pt msgTypesMap){
+	emptyMsgTypesMap(msgTypesMap);
+}
+
+version_pt pubsubSerializer_getVersion(pubsub_serializer_pt serializer, pubsub_message_type *msgType){
+	version_pt msgVersion = NULL;
+	dynMessage_getVersion((dyn_message_type *) msgType, &msgVersion);
+	return msgVersion;
+}
+
+char* pubsubSerializer_getName(pubsub_serializer_pt serializer, pubsub_message_type *msgType){
+	char *name = NULL;
+	dynMessage_getName((dyn_message_type *) msgType, &name);
+	return name;
+}
+
+void pubsubSerializer_freeMsg(pubsub_serializer_pt serializer, pubsub_message_type *msgType, void *msg){
+	dyn_type *type = NULL;
+	dynMessage_getMessageType((dyn_message_type *) msgType, &type);
+	dynType_free(type, msg);
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h b/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h
index 2e940aa..c7cb100 100644
--- a/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h
+++ b/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h
@@ -41,6 +41,9 @@
 struct pubsub_topology_manager {
 	bundle_context_pt context;
 
+	celix_thread_mutex_t serializerListLock;
+	array_list_pt serializerList;
+
 	celix_thread_mutex_t psaListLock;
 	array_list_pt psaList;
 
@@ -62,6 +65,11 @@ celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_help
 celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager);
 celix_status_t pubsub_topologyManager_closeImports(pubsub_topology_manager_pt manager);
 
+celix_status_t pubsub_topologyManager_pubsubSerializerAdding(void *handle, service_reference_pt reference, void **service);
+celix_status_t pubsub_topologyManager_pubsubSerializerAdded(void *handle, service_reference_pt reference, void *service);
+celix_status_t pubsub_topologyManager_pubsubSerializerModified(void *handle, service_reference_pt reference, void *service);
+celix_status_t pubsub_topologyManager_pubsubSerializerRemoved(void *handle, service_reference_pt reference, void *service);
+
 celix_status_t pubsub_topologyManager_psaAdding(void *handle, service_reference_pt reference, void **service);
 celix_status_t pubsub_topologyManager_psaAdded(void *handle, service_reference_pt reference, void *service);
 celix_status_t pubsub_topologyManager_psaModified(void *handle, service_reference_pt reference, void *service);

http://git-wip-us.apache.org/repos/asf/celix/blob/a030567d/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/private/src/pstm_activator.c b/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
index a35d161..c202e7a 100644
--- a/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
+++ b/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
@@ -48,6 +48,7 @@ struct activator {
 
 	pubsub_topology_manager_pt manager;
 
+	service_tracker_pt pubsubSerializerTracker;
 	service_tracker_pt pubsubDiscoveryTracker;
 	service_tracker_pt pubsubAdminTracker;
 	service_tracker_pt pubsubSubscribersTracker;
@@ -61,12 +62,31 @@ struct activator {
 	log_helper_pt loghelper;
 };
 
-
+static celix_status_t bundleActivator_createPSSTracker(struct activator *activator, service_tracker_pt *tracker);
 static celix_status_t bundleActivator_createPSDTracker(struct activator *activator, service_tracker_pt *tracker);
 static celix_status_t bundleActivator_createPSATracker(struct activator *activator, service_tracker_pt *tracker);
 static celix_status_t bundleActivator_createPSSubTracker(struct activator *activator, service_tracker_pt *tracker);
 
 
+static celix_status_t bundleActivator_createPSSTracker(struct activator *activator, service_tracker_pt *tracker) {
+	celix_status_t status;
+
+	service_tracker_customizer_pt customizer = NULL;
+
+	status = serviceTrackerCustomizer_create(activator->manager,
+			pubsub_topologyManager_pubsubSerializerAdding,
+			pubsub_topologyManager_pubsubSerializerAdded,
+			pubsub_topologyManager_pubsubSerializerModified,
+			pubsub_topologyManager_pubsubSerializerRemoved,
+			&customizer);
+
+	if (status == CELIX_SUCCESS) {
+		status = serviceTracker_create(activator->context, (char *) PUBSUB_SERIALIZER_SERVICE, customizer, tracker);
+	}
+
+	return status;
+}
+
 static celix_status_t bundleActivator_createPSDTracker(struct activator *activator, service_tracker_pt *tracker) {
 	celix_status_t status;
 
@@ -143,14 +163,20 @@ celix_status_t bundleActivator_create(bundle_context_pt context, void **userData
 	if (status == CELIX_SUCCESS) {
 		status = bundleActivator_createPSDTracker(activator, &activator->pubsubDiscoveryTracker);
 		if (status == CELIX_SUCCESS) {
-			status = bundleActivator_createPSATracker(activator, &activator->pubsubAdminTracker);
-			if (status == CELIX_SUCCESS) {
-				status = bundleActivator_createPSSubTracker(activator, &activator->pubsubSubscribersTracker);
+			status = bundleActivator_createPSSTracker(activator, &activator->pubsubSerializerTracker);
+			if (status == CELIX_SUCCESS){
+				status = bundleActivator_createPSATracker(activator, &activator->pubsubAdminTracker);
 				if (status == CELIX_SUCCESS) {
-					*userData = activator;
+					status = bundleActivator_createPSSubTracker(activator, &activator->pubsubSubscribersTracker);
+					if (status == CELIX_SUCCESS) {
+						*userData = activator;
+					}
+					if (status != CELIX_SUCCESS){
+						serviceTracker_destroy(activator->pubsubAdminTracker);
+					}
 				}
 				if (status != CELIX_SUCCESS){
-					serviceTracker_destroy(activator->pubsubAdminTracker);
+					serviceTracker_destroy(activator->pubsubSerializerTracker);
 				}
 			}
 			if (status != CELIX_SUCCESS){
@@ -198,13 +224,12 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt context)
 	properties_set(props, (char *) OSGI_RSA_SERVICE_EXPORTED_INTERFACES, (char *) PUBSUB_TOPIC_INFO_SERVICE);
 	status += bundleContext_registerService(context, (char *) PUBSUB_TOPIC_INFO_SERVICE, activator->topicInfo, props, &activator->topicInfoService);
 	*/
-	status += serviceTracker_open(activator->pubsubAdminTracker);
 
+	status += serviceTracker_open(activator->pubsubSerializerTracker);
+	status += serviceTracker_open(activator->pubsubAdminTracker);
 	status += serviceTracker_open(activator->pubsubDiscoveryTracker);
-
 	status += serviceTracker_open(activator->pubsubSubscribersTracker);
 
-
 	return status;
 }
 
@@ -214,6 +239,7 @@ celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context)
 
 	serviceTracker_close(activator->pubsubSubscribersTracker);
 	serviceTracker_close(activator->pubsubDiscoveryTracker);
+	serviceTracker_close(activator->pubsubSerializerTracker);
 	serviceTracker_close(activator->pubsubAdminTracker);
 
 	serviceRegistration_unregister(activator->publisherEPDiscoverService);
@@ -235,6 +261,7 @@ celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt contex
 
 		serviceTracker_destroy(activator->pubsubSubscribersTracker);
 		serviceTracker_destroy(activator->pubsubDiscoveryTracker);
+		serviceTracker_destroy(activator->pubsubSerializerTracker);
 		serviceTracker_destroy(activator->pubsubAdminTracker);
 
 		logHelper_stop(activator->loghelper);


Mime
View raw message