celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From griccia...@apache.org
Subject [04/11] celix git commit: Refactored serializers management
Date Fri, 29 Sep 2017 13:34:20 GMT
http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
index bed5dfc..b85f0a9 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
@@ -7,7 +7,7 @@
  *"License"); you may not use this file except in compliance
  *with the License.  You may obtain a copy of the License at
  *
- *  http://www.apache.org/licenses/LICENSE-2.0
+ *  htPSA_UDP_MC_TP://www.apache.org/licenses/LICENSE-2.0
  *
  *Unless required by applicable law or agreed to in writing,
  *software distributed under the License is distributed on an
@@ -41,11 +41,13 @@
 #include "service_factory.h"
 #include "version.h"
 
-#include "pubsub_publish_service_private.h"
+#include "topic_publication.h"
 #include "pubsub_common.h"
 #include "publisher.h"
 #include "large_udp.h"
 
+#include "pubsub_serializer.h"
+
 #define EP_ADDRESS_LEN		32
 
 #define FIRST_SEND_DELAY	2
@@ -57,28 +59,26 @@ struct topic_publication {
 	array_list_pt pub_ep_list; //List<pubsub_endpoint>
 	hash_map_pt boundServices; //<bundle_pt,bound_service>
 	celix_thread_mutex_t tp_lock;
+	pubsub_serializer_service_t *serializer;
 	struct sockaddr_in destAddr;
-	pubsub_serializer_service_t* serializerSvc;
 };
 
 typedef struct publish_bundle_bound_service {
 	topic_publication_pt parent;
-	pubsub_publisher_t pubSvc;
+	pubsub_publisher_t service;
 	bundle_pt bundle;
-    char *scope;
+	char *scope;
 	char *topic;
+	hash_map_pt msgTypes;
 	unsigned short getCount;
 	celix_thread_mutex_t mp_lock;
-	bool mp_send_in_progress;
-	array_list_pt mp_parts;
 	largeUdp_pt largeUdpHandle;
-	pubsub_msg_serializer_map_t* map;
-} publish_bundle_bound_service_t;
+}* publish_bundle_bound_service_pt;
 
 typedef struct pubsub_msg{
 	pubsub_msg_header_pt header;
 	char* payload;
-	int payloadSize;
+	size_t payloadSize;
 } pubsub_msg_t;
 
 static unsigned int rand_range(unsigned int min, unsigned int max);
@@ -86,10 +86,10 @@ static unsigned int rand_range(unsigned int min, unsigned int max);
 static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
 static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
 
-static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle);
-static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc);
+static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle);
+static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc);
 
-static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg);
+static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, const void *msg);
 
 static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId);
 
@@ -97,12 +97,12 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsig
 static void delay_first_send_for_late_joiners(void);
 
 
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, char* bindIP, topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, topic_publication_pt *out){
 
-    char* ep = malloc(EP_ADDRESS_LEN);
-    memset(ep,0,EP_ADDRESS_LEN);
-    unsigned int port = pubEP->serviceID + rand_range(UDP_BASE_PORT+pubEP->serviceID+3, UDP_MAX_PORT);
-    snprintf(ep,EP_ADDRESS_LEN,"udp://%s:%u",bindIP,port);
+	char* ep = malloc(EP_ADDRESS_LEN);
+	memset(ep,0,EP_ADDRESS_LEN);
+	unsigned int port = pubEP->serviceID + rand_range(UDP_BASE_PORT+pubEP->serviceID+3, UDP_MAX_PORT);
+	snprintf(ep,EP_ADDRESS_LEN,"udp://%s:%u",bindIP,port);
 
 
 	topic_publication_pt pub = calloc(1,sizeof(*pub));
@@ -116,7 +116,8 @@ celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt
 	pub->destAddr.sin_family = AF_INET;
 	pub->destAddr.sin_addr.s_addr = inet_addr(bindIP);
 	pub->destAddr.sin_port = htons(port);
-	pub->serializerSvc = NULL;
+
+	pub->serializer = best_serializer;
 
 	pubsub_topicPublicationAddPublisherEP(pub,pubEP);
 
@@ -127,6 +128,7 @@ celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt
 
 celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
 	celix_status_t status = CELIX_SUCCESS;
+
 	celixThreadMutex_lock(&(pub->tp_lock));
 
 	free(pub->endpoint);
@@ -134,14 +136,18 @@ celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
 
 	hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
 	while(hashMapIterator_hasNext(iter)){
-		publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(iter);
+		publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter);
 		pubsub_destroyPublishBundleBoundService(bound);
 	}
 	hashMapIterator_destroy(iter);
 	hashMap_destroy(pub->boundServices,false,false);
 
 	pub->svcFactoryReg = NULL;
-	status = close(pub->sendSocket);
+	pub->serializer = NULL;
+
+	if(close(pub->sendSocket) != 0){
+		status = CELIX_FILE_IO_EXCEPTION;
+	}
 
 	celixThreadMutex_unlock(&(pub->tp_lock));
 
@@ -156,7 +162,6 @@ celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,top
 	celix_status_t status = CELIX_SUCCESS;
 
 	/* Let's register the new service */
-	//celixThreadMutex_lock(&(pub->tp_lock));
 
 	pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0);
 
@@ -167,39 +172,29 @@ celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,top
 		factory->ungetService = pubsub_topicPublicationUngetService;
 
 		properties_pt props = properties_create();
-        properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope);
+		properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope);
 		properties_set(props,PUBSUB_PUBLISHER_TOPIC,pubEP->topic);
 
 		status = bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg));
 
 		if(status != CELIX_SUCCESS){
 			properties_destroy(props);
-			printf("PSA: Cannot register ServiceFactory for topic %s, topic %s (bundle %ld).\n",pubEP->scope, pubEP->topic,pubEP->serviceID);
+			printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot register ServiceFactory for topic %s, topic %s (bundle %ld).\n",pubEP->scope, pubEP->topic,pubEP->serviceID);
 		}
 		else{
 			*svcFactory = factory;
 		}
 	}
 	else{
-		printf("PSA: Cannot find pubsub_endpoint after adding it...Should never happen!\n");
+		printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot find pubsub_endpoint after adding it...Should never happen!\n");
 		status = CELIX_SERVICE_EXCEPTION;
 	}
 
-	//celixThreadMutex_unlock(&(pub->tp_lock));
-
 	return status;
 }
 
 celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
-	celix_status_t status = CELIX_SUCCESS;
-
-	//celixThreadMutex_lock(&(pub->tp_lock));
-
-	status = serviceRegistration_unregister(pub->svcFactoryReg);
-
-	//celixThreadMutex_unlock(&(pub->tp_lock));
-
-	return status;
+	return serviceRegistration_unregister(pub->svcFactoryReg);
 }
 
 celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){
@@ -221,66 +216,12 @@ celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub
 	return CELIX_SUCCESS;
 }
 
-celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc){
-	celix_status_t status = CELIX_SUCCESS;
-
+array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){
+	array_list_pt list = NULL;
 	celixThreadMutex_lock(&(pub->tp_lock));
-
-    //clear old serializer
-    if (pub->serializerSvc != NULL) {
-        hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); //key = bundle_pt, publish_bundle_bound_service_t*
-        while (hashMapIterator_hasNext(&iter)) {
-            publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter);
-            celixThreadMutex_lock(&bound->mp_lock);
-			pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map);
-            celixThreadMutex_unlock(&bound->mp_lock);
-			bound->map = NULL;
-        }
-    }
-
-    //setup new serializer
-	pub->serializerSvc = serializerSvc;
-    if (pub->serializerSvc != NULL) {
-        hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
-        while (hashMapIterator_hasNext(&iter)) {
-            hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
-            bundle_pt bundle = hashMapEntry_getKey(entry);
-            publish_bundle_bound_service_t *bound = hashMapEntry_getValue(entry);
-            celixThreadMutex_lock(&bound->mp_lock);
-            pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, &bound->map);
-            celixThreadMutex_unlock(&bound->mp_lock);
-        }
-    }
-
+	list = arrayList_clone(pub->pub_ep_list);
 	celixThreadMutex_unlock(&(pub->tp_lock));
-
-	return status;
-}
-
-celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* svc){
-	celix_status_t status = CELIX_SUCCESS;
-
-	celixThreadMutex_lock(&(pub->tp_lock));
-    if (pub->serializerSvc == svc) {
-        hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices);
-        while (hashMapIterator_hasNext(&iter)) {
-            publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter);
-            celixThreadMutex_lock(&bound->mp_lock);
-            pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map);
-            celixThreadMutex_unlock(&bound->mp_lock);
-			bound->map = NULL;
-        }
-
-        pub->serializerSvc = NULL;
-    }
-
-	celixThreadMutex_unlock(&(pub->tp_lock));
-
-	return status;
-}
-
-array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){
-	return pub->pub_ep_list;
+	return list;
 }
 
 
@@ -291,19 +232,19 @@ static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt
 
 	celixThreadMutex_lock(&(publish->tp_lock));
 
-	publish_bundle_bound_service_t* bound = hashMap_get(publish->boundServices, bundle);
-	if (bound == NULL) {
-		bound = pubsub_createPublishBundleBoundService(publish, bundle);
-		if (bound != NULL) {
-			hashMap_put(publish->boundServices, bundle, bound);
+	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
+	if(bound==NULL){
+		bound = pubsub_createPublishBundleBoundService(publish,bundle);
+		if(bound!=NULL){
+			hashMap_put(publish->boundServices,bundle,bound);
 		}
 	}
-	else {
+	else{
 		bound->getCount++;
 	}
 
 	if (bound != NULL) {
-		*service = &bound->pubSvc;
+		*service = &bound->service;
 	}
 
 	celixThreadMutex_unlock(&(publish->tp_lock));
@@ -317,19 +258,20 @@ static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_p
 
 	celixThreadMutex_lock(&(publish->tp_lock));
 
-	publish_bundle_bound_service_t* bound = hashMap_get(publish->boundServices, bundle);
-	if (bound != NULL) {
+	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
+	if(bound!=NULL){
 
 		bound->getCount--;
-		if (bound->getCount == 0) {
+		if(bound->getCount==0){
 			pubsub_destroyPublishBundleBoundService(bound);
 			hashMap_remove(publish->boundServices,bundle);
 		}
+
 	}
-	else {
+	else{
 		long bundleId = -1;
 		bundle_getBundleId(bundle,&bundleId);
-		printf("TP: Unexpected ungetService call for bundle %ld.\n", bundleId);
+		printf("PSA_UDP_MC_TP: Unexpected ungetService call for bundle %ld.\n", bundleId);
 	}
 
 	/* service should be never used for unget, so let's set the pointer to NULL */
@@ -340,7 +282,7 @@ static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_p
 	return CELIX_SUCCESS;
 }
 
-static bool send_pubsub_msg(publish_bundle_bound_service_t* bound, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback){
+static bool send_pubsub_msg(publish_bundle_bound_service_pt bound, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback){
 	const int iovec_len = 3; // header + size + payload
 	bool ret = true;
 
@@ -357,50 +299,36 @@ static bool send_pubsub_msg(publish_bundle_bound_service_t* bound, pubsub_msg_t*
 	delay_first_send_for_late_joiners();
 
 	if(largeUdp_sendmsg(bound->largeUdpHandle, bound->parent->sendSocket, msg_iovec, iovec_len, 0, &bound->parent->destAddr, sizeof(bound->parent->destAddr)) == -1) {
-	    fprintf(stderr, "Socket: %d, size: %i",bound->parent->sendSocket, compiledMsgSize);
-	    perror("send_pubsub_msg:sendSocket");
-	    ret = false;
+		fprintf(stderr, "Socket: %d, size: %i",bound->parent->sendSocket, compiledMsgSize);
+		perror("send_pubsub_msg:sendSocket");
+		ret = false;
 	}
 
 	if(releaseCallback) {
-	    releaseCallback->release(msg->payload, bound);
+		releaseCallback->release(msg->payload, bound);
 	}
 	return ret;
-}
 
+}
 
-static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg) {
-    int status = 0;
-    publish_bundle_bound_service_t* bound = handle;
 
-    celixThreadMutex_lock(&(bound->parent->tp_lock));
-    celixThreadMutex_lock(&(bound->mp_lock));
+static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg) {
+	int status = 0;
+	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle;
 
-    pubsub_msg_serializer_t *msgSer = NULL;
-    if (bound->map != NULL) {
-        msgSer = hashMap_get(bound->map->serializers, (void *)(uintptr_t)msgTypeId);
-    }
+	celixThreadMutex_lock(&(bound->parent->tp_lock));
+	celixThreadMutex_lock(&(bound->mp_lock));
 
-    if (bound->map == NULL) {
-        printf("TP: Serializer is not set!\n");
-        status = 1;
-    } else if (msgSer == NULL ){
-        printf("TP: No msg serializer available for msg type id %d\n", msgTypeId);
-        hash_map_iterator_t iter = hashMapIterator_construct(bound->map->serializers);
-        printf("Note supported messages:\n");
-        while (hashMapIterator_hasNext(&iter)) {
-            pubsub_msg_serializer_t *msgSer = hashMapIterator_nextValue(&iter);
-            printf("\tmsg %s with id %d\n", msgSer->msgName, msgSer->msgId);
-        }
-        status = 1;
-    }
+	pubsub_msg_serializer_t* msgSer = (pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, (void*)(uintptr_t)msgTypeId);
 
-    int major=0, minor=0;
+	if (msgSer != NULL) {
+		int major=0, minor=0;
 
-    if (status == 0 && msgSer != NULL) {
 		pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
 		strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
 		msg_hdr->type = msgTypeId;
+
+
 		if (msgSer->msgVersion != NULL){
 			version_getMajor(msgSer->msgVersion, &major);
 			version_getMinor(msgSer->msgVersion, &minor);
@@ -408,15 +336,16 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, con
 			msg_hdr->minor = minor;
 		}
 
-		char* serializedOutput = NULL;
+		void* serializedOutput = NULL;
 		size_t serializedOutputLen = 0;
-        msgSer->serialize(msgSer->handle, msg, &serializedOutput, &serializedOutputLen);
+		msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen);
 
-		pubsub_msg_t* msg = calloc(1,sizeof(struct pubsub_msg));
+		pubsub_msg_t *msg = calloc(1,sizeof(pubsub_msg_t));
 		msg->header = msg_hdr;
-		msg->payload = serializedOutput;
+		msg->payload = (char *)serializedOutput;
 		msg->payloadSize = serializedOutputLen;
 
+
 		if(send_pubsub_msg(bound, msg,true, NULL) == false) {
 			status = -1;
 		}
@@ -424,38 +353,21 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, con
 		free(msg);
 		free(serializedOutput);
 
-    }
 
-    celixThreadMutex_unlock(&(bound->mp_lock));
+	} else {
+		printf("PSA_UDP_MC_TP: No msg serializer available for msg type id %d\n", msgTypeId);
+		status=-1;
+	}
+
+	celixThreadMutex_unlock(&(bound->mp_lock));
 	celixThreadMutex_unlock(&(bound->parent->tp_lock));
 
-    return status;
+	return status;
 }
 
-static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* out){
-    publish_bundle_bound_service_t* bound = handle;
-    unsigned int msgTypeId = 0;
-
-    celixThreadMutex_lock(&bound->mp_lock);
-    if (bound->map != NULL) {
-        hash_map_iterator_t iter = hashMapIterator_construct(bound->map->serializers);
-        while (hashMapIterator_hasNext(&iter)) {
-            pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter);
-            if (strncmp(msgType, msgSer->msgName, 1024*1024) == 0) {
-                msgTypeId = msgSer->msgId;
-                break;
-            }
-        }
-    }
-    celixThreadMutex_unlock(&bound->mp_lock);
-
-    if (msgTypeId != 0) {
-        *out = msgTypeId;
-        return 0;
-    } else {
-        printf("TP: Cannot find msg type id for msg type %s\n", msgType);
-        return 1;
-    }
+static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){
+	*msgTypeId = utils_stringHash(msgType);
+	return 0;
 }
 
 
@@ -466,58 +378,49 @@ static unsigned int rand_range(unsigned int min, unsigned int max){
 
 }
 
-static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle) {
-    //PRECOND lock on publish->tp_lock
-    publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound));
+static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){
+
+	publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
 
 	if (bound != NULL) {
+
 		bound->parent = tp;
 		bound->bundle = bundle;
 		bound->getCount = 1;
-		bound->mp_send_in_progress = false;
 		celixThreadMutex_create(&bound->mp_lock,NULL);
-		arrayList_create(&bound->mp_parts);
+
+		if(tp->serializer != NULL){
+			tp->serializer->createSerializerMap(tp->serializer->handle,bundle,&bound->msgTypes);
+		}
 
 		pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
 		bound->scope=strdup(pubEP->scope);
 		bound->topic=strdup(pubEP->topic);
 		bound->largeUdpHandle = largeUdp_create(1);
-		bound->pubSvc.handle = bound;
-		bound->pubSvc.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID;
-		bound->pubSvc.send = pubsub_topicPublicationSend;
-		bound->pubSvc.sendMultipart = NULL;  //Multipart not supported (jet) for UDP
 
-    	if (tp->serializerSvc != NULL) {
-            tp->serializerSvc->createSerializerMap(tp->serializerSvc->handle, bundle, &bound->map);
-		}
-	}
-	else
-	{
-		free(bound);
-		return NULL;
+		bound->service.handle = bound;
+		bound->service.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID;
+		bound->service.send = pubsub_topicPublicationSend;
+		bound->service.sendMultipart = NULL;  //Multipart not supported for UDP
+
 	}
 
 	return bound;
 }
 
-static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc) {
-    //PRECOND lock on publish->tp_lock
-	celixThreadMutex_lock(&boundSvc->mp_lock);
+static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){
 
-    if (boundSvc->map != NULL && boundSvc->parent->serializerSvc != NULL) {
-    	boundSvc->parent->serializerSvc->destroySerializerMap(boundSvc->parent->serializerSvc->handle, boundSvc->map);
-    	boundSvc->map = NULL;
-    }
+	celixThreadMutex_lock(&boundSvc->mp_lock);
 
-	if (boundSvc->mp_parts!=NULL) {
-		arrayList_destroy(boundSvc->mp_parts);
+	if(boundSvc->parent->serializer != NULL && boundSvc->msgTypes != NULL){
+		boundSvc->parent->serializer->destroySerializerMap(boundSvc->parent->serializer->handle, boundSvc->msgTypes);
 	}
 
-    if (boundSvc->scope!=NULL) {
-        free(boundSvc->scope);
-    }
+	if(boundSvc->scope!=NULL){
+		free(boundSvc->scope);
+	}
 
-    if (boundSvc->topic!=NULL) {
+	if(boundSvc->topic!=NULL){
 		free(boundSvc->topic);
 	}
 
@@ -535,7 +438,7 @@ static void delay_first_send_for_late_joiners(){
 	static bool firstSend = true;
 
 	if(firstSend){
-		printf("TP: Delaying first send for late joiners...\n");
+		printf("PSA_UDP_MC_TP: Delaying first send for late joiners...\n");
 		sleep(FIRST_SEND_DELAY);
 		firstSend = false;
 	}

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
index 7a3f5a9..5896264 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
@@ -29,35 +29,31 @@
 #include <unistd.h>
 #include <signal.h>
 
+#include <sys/types.h>
 #include <sys/socket.h>
+#include <sys/epoll.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>
 
-#if defined(__APPLE__) && defined(__MACH__)
-	#include <sys/event.h>
-	#include <sys/time.h>
-#else
-	#include <sys/epoll.h>
-#endif
-
 #include "utils.h"
 #include "celix_errno.h"
 #include "constants.h"
 #include "version.h"
 
 #include "topic_subscription.h"
+#include "topic_publication.h"
 #include "subscriber.h"
 #include "publisher.h"
-#include "pubsub_publish_service_private.h"
 #include "large_udp.h"
 
+#include "pubsub_serializer.h"
+
 #define MAX_EPOLL_EVENTS        10
 #define RECV_THREAD_TIMEOUT     5
 #define UDP_BUFFER_SIZE         65535
 #define MAX_UDP_SESSIONS        16
 
-struct topic_subscription {
-
+struct topic_subscription{
 	char* ifIpAddress;
 	service_tracker_pt tracker;
 	array_list_pt sub_ep_list;
@@ -65,25 +61,24 @@ struct topic_subscription {
 	bool running;
 	celix_thread_mutex_t ts_lock;
 	bundle_context_pt context;
-	int topicEpollFd; // EPOLL filedescriptor where the sockets are registered.
 
-    //NOTE. using a service ptr can be dangerous, because pointer can be reused.
-    //ensuring that pointer are removed before new (refurbish) pionter comes along is crucial!
-	hash_map_pt msgSerializerMapMap; // key = service ptr, value = pubsub_msg_serializer_map_t*
-    hash_map_pt bundleMap; //key = service ptr, value = bundle_pt
+	pubsub_serializer_service_t *serializer;
 
+	int topicEpollFd; // EPOLL filedescriptor where the sockets are registered.
+	hash_map_pt servicesMap; // key = service, value = msg types map
 	hash_map_pt socketMap; // key = URL, value = listen-socket
+
+	celix_thread_mutex_t pendingConnections_lock;
+	array_list_pt pendingConnections;
+
+	array_list_pt pendingDisconnections;
+	celix_thread_mutex_t pendingDisconnections_lock;
+
+	//array_list_pt rawServices;
 	unsigned int nrSubscribers;
 	largeUdp_pt largeUdpHandle;
-	pubsub_serializer_service_t* serializerSvc;
-
 };
 
-typedef struct mp_handle{
-	hash_map_pt svc_msg_db;
-	hash_map_pt rcv_msg_map;
-}* mp_handle_pt;
-
 typedef struct msg_map_entry{
 	bool retain;
 	void* msgInst;
@@ -95,9 +90,11 @@ static void* udp_recv_thread_func(void* arg);
 static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr);
 static void sigusr1_sighandler(int signo);
 static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
+static void connectPendingPublishers(topic_subscription_pt sub);
+static void disconnectPendingPublishers(topic_subscription_pt sub);
 
 
-celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, pubsub_serializer_service_t* serializer, char* scope, char* topic,topic_subscription_pt* out){
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, char* ifIp,char* scope, char* topic ,pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out){
 	celix_status_t status = CELIX_SUCCESS;
 
 	topic_subscription_pt ts = (topic_subscription_pt) calloc(1,sizeof(*ts));
@@ -109,35 +106,39 @@ celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundl
 	ts->topicEpollFd = epoll_create1(0);
 #endif
 	if(ts->topicEpollFd == -1) {
-	    status += CELIX_SERVICE_EXCEPTION;
+		status += CELIX_SERVICE_EXCEPTION;
 	}
 
 	ts->running = false;
 	ts->nrSubscribers = 0;
-	ts->serializerSvc = NULL;
+	ts->serializer = best_serializer;
 
 	celixThreadMutex_create(&ts->ts_lock,NULL);
 	arrayList_create(&ts->sub_ep_list);
-	ts->msgSerializerMapMap = hashMap_create(NULL, NULL, NULL, NULL);
-    ts->bundleMap = hashMap_create(NULL, NULL, NULL, NULL);
+	ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL);
 	ts->socketMap =  hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
+	arrayList_create(&ts->pendingConnections);
+	arrayList_create(&ts->pendingDisconnections);
+	celixThreadMutex_create(&ts->pendingConnections_lock, NULL);
+	celixThreadMutex_create(&ts->pendingDisconnections_lock, NULL);
+
 	ts->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS);
 
 	char filter[128];
 	memset(filter,0,128);
 	if(strncmp(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, scope, strlen(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT)) == 0) {
-        // default scope, means that subscriber has not defined a scope property
-        snprintf(filter, 128, "(&(%s=%s)(%s=%s))",
-                (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME,
-                PUBSUB_SUBSCRIBER_TOPIC,topic);
-
-    } else {
-        snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))",
-                (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME,
-                PUBSUB_SUBSCRIBER_TOPIC,topic,
-                PUBSUB_SUBSCRIBER_SCOPE,scope);
-    }
+		// default scope, means that subscriber has not defined a scope property
+		snprintf(filter, 128, "(&(%s=%s)(%s=%s))",
+				(char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME,
+				PUBSUB_SUBSCRIBER_TOPIC,topic);
+
+	} else {
+		snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))",
+				(char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME,
+				PUBSUB_SUBSCRIBER_TOPIC,topic,
+				PUBSUB_SUBSCRIBER_SCOPE,scope);
+	}
 
 	service_tracker_customizer_pt customizer = NULL;
 	status += serviceTrackerCustomizer_create(ts,NULL,topicsub_subscriberTracked,NULL,topicsub_subscriberUntracked,&customizer);
@@ -151,10 +152,9 @@ celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundl
 
 	sigaction(SIGUSR1,&actions,NULL);
 
-    if (status == CELIX_SUCCESS) {
-        *out=ts;
-        pubsub_topicSubscriptionSetSerializer(ts, serializer);
-    }
+	if (status == CELIX_SUCCESS) {
+		*out=ts;
+	}
 
 	return status;
 }
@@ -168,10 +168,20 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
 	serviceTracker_destroy(ts->tracker);
 	arrayList_clear(ts->sub_ep_list);
 	arrayList_destroy(ts->sub_ep_list);
-	hashMap_destroy(ts->msgSerializerMapMap, false, false);
-    hashMap_destroy(ts->bundleMap, false, false);
+	hashMap_destroy(ts->servicesMap,false,false);
+
+	hashMap_destroy(ts->socketMap,true,true);
+
+	celixThreadMutex_lock(&ts->pendingConnections_lock);
+	arrayList_destroy(ts->pendingConnections);
+	celixThreadMutex_unlock(&ts->pendingConnections_lock);
+	celixThreadMutex_destroy(&ts->pendingConnections_lock);
+
+	celixThreadMutex_lock(&ts->pendingDisconnections_lock);
+	arrayList_destroy(ts->pendingDisconnections);
+	celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
+	celixThreadMutex_destroy(&ts->pendingDisconnections_lock);
 
-	hashMap_destroy(ts->socketMap,false,false);
 	largeUdp_destroy(ts->largeUdpHandle);
 #if defined(__APPLE__) && defined(__MACH__)
 	//TODO: Use kqueue for OSX
@@ -211,15 +221,16 @@ celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){
 
 	celixThread_join(ts->recv_thread,NULL);
 
-    status = serviceTracker_close(ts->tracker);
+	status = serviceTracker_close(ts->tracker);
 
-    hash_map_iterator_pt it = hashMapIterator_create(ts->socketMap);
-    while(hashMapIterator_hasNext(it)) {
-        hash_map_entry_pt entry = hashMapIterator_nextEntry(it);
-        char *url = hashMapEntry_getKey(entry);
-        pubsub_topicSubscriptionDisconnectPublisher(ts, url);
-    }
-    hashMapIterator_destroy(it);
+	hash_map_iterator_pt it = hashMapIterator_create(ts->socketMap);
+	while(hashMapIterator_hasNext(it)) {
+		hash_map_entry_pt entry = hashMapIterator_nextEntry(it);
+		char *url = hashMapEntry_getKey(entry);
+		pubsub_topicSubscriptionDisconnectPublisher(ts, url);
+		free(url);
+	}
+	hashMapIterator_destroy(it);
 
 
 	return status;
@@ -227,108 +238,126 @@ celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){
 
 celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL) {
 
-    printf("pubsub_topicSubscriptionConnectPublisher : pubURL = %s\n", pubURL);
+	printf("pubsub_topicSubscriptionConnectPublisher : pubURL = %s\n", pubURL);
 
-    celix_status_t status = CELIX_SUCCESS;
+	celix_status_t status = CELIX_SUCCESS;
+	celixThreadMutex_lock(&ts->ts_lock);
 
-    if (!hashMap_containsKey(ts->socketMap, pubURL)){
+	if(hashMap_containsKey(ts->socketMap, pubURL)){
+		printf("PSA_UDM_MC_TS: PubURL %s already existing!\n",pubURL);
+		celixThreadMutex_unlock(&ts->ts_lock);
+		return CELIX_SERVICE_EXCEPTION;
+	}
 
-		celixThreadMutex_lock(&ts->ts_lock);
+	int *recvSocket = calloc(sizeof(int), 1);
+	*recvSocket = socket(AF_INET, SOCK_DGRAM, 0);
+	if (*recvSocket < 0) {
+		perror("pubsub_topicSubscriptionCreate:socket");
+		status = CELIX_SERVICE_EXCEPTION;
+	}
 
-		int *recvSocket = calloc(sizeof(int), 1);
-		*recvSocket = socket(AF_INET, SOCK_DGRAM, 0);
-		if (*recvSocket < 0) {
-			perror("pubsub_topicSubscriptionCreate:socket");
+	if (status == CELIX_SUCCESS){
+		int reuse = 1;
+		if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse)) != 0) {
+			perror("setsockopt() SO_REUSEADDR");
+			status = CELIX_SERVICE_EXCEPTION;
+		}
+	}
+
+	if(status == CELIX_SUCCESS){
+		// TODO Check if there is a better way to parse the URL to IP/Portnr
+		//replace ':' by spaces
+		char *url = strdup(pubURL);
+		char *pt = url;
+		while((pt=strchr(pt, ':')) != NULL) {
+			*pt = ' ';
+		}
+		char mcIp[100];
+		unsigned short mcPort;
+		sscanf(url, "udp //%s %hu", mcIp, &mcPort);
+		free (url);
+
+		printf("pubsub_topicSubscriptionConnectPublisher : IP = %s, Port = %hu\n", mcIp, mcPort);
+
+		struct ip_mreq mc_addr;
+		mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp);
+		mc_addr.imr_interface.s_addr = inet_addr(ts->ifIpAddress);
+		printf("Adding MC %s at interface %s\n", mcIp, ts->ifIpAddress);
+		if (setsockopt(*recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mc_addr, sizeof(mc_addr)) != 0) {
+			perror("setsockopt() IP_ADD_MEMBERSHIP");
 			status = CELIX_SERVICE_EXCEPTION;
 		}
 
 		if (status == CELIX_SUCCESS){
-			int reuse = 1;
-			if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse)) != 0) {
-				perror("setsockopt() SO_REUSEADDR");
+			struct sockaddr_in mcListenAddr;
+			mcListenAddr.sin_family = AF_INET;
+			mcListenAddr.sin_addr.s_addr = INADDR_ANY;
+			mcListenAddr.sin_port = htons(mcPort);
+			if(bind(*recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr)) != 0) {
+				perror("bind()");
 				status = CELIX_SERVICE_EXCEPTION;
 			}
 		}
 
 		if (status == CELIX_SUCCESS){
-			// TODO Check if there is a better way to parse the URL to IP/Portnr
-			//replace ':' by spaces
-			char *url = strdup(pubURL);
-			char *pt = url;
-			while((pt=strchr(pt, ':')) != NULL) {
-				*pt = ' ';
-			}
-			char mcIp[100];
-			unsigned short mcPort;
-			sscanf(url, "udp //%s %hu", mcIp, &mcPort);
-			free (url);
-
-			printf("pubsub_topicSubscriptionConnectPublisher : IP = %s, Port = %hu\n", mcIp, mcPort);
-
-			struct ip_mreq mc_addr;
-			mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp);
-			mc_addr.imr_interface.s_addr = inet_addr(ts->ifIpAddress);
-			printf("Adding MC %s at interface %s\n", mcIp, ts->ifIpAddress);
-			if (setsockopt(*recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mc_addr, sizeof(mc_addr)) != 0) {
-				perror("setsockopt() IP_ADD_MEMBERSHIP");
+#if defined(__APPLE__) && defined(__MACH__)
+			//TODO: Use kqueue for OSX
+#else
+			struct epoll_event ev;
+			memset(&ev, 0, sizeof(ev));
+			ev.events = EPOLLIN;
+			ev.data.fd = *recvSocket;
+			if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, *recvSocket, &ev) == -1) {
+				perror("epoll_ctl() EPOLL_CTL_ADD");
 				status = CELIX_SERVICE_EXCEPTION;
 			}
+#endif
+		}
 
-			if (status == CELIX_SUCCESS){
-				struct sockaddr_in mcListenAddr;
-				mcListenAddr.sin_family = AF_INET;
-				mcListenAddr.sin_addr.s_addr = INADDR_ANY;
-				mcListenAddr.sin_port = htons(mcPort);
-				if(bind(*recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr)) != 0) {
-					perror("bind()");
-					status = CELIX_SERVICE_EXCEPTION;
-				}
-			}
-
-			if (status == CELIX_SUCCESS){
-				#if defined(__APPLE__) && defined(__MACH__)
-					//TODO: Use kqueue for OSX
-				#else
-					struct epoll_event ev;
-					memset(&ev, 0, sizeof(ev));
-					ev.events = EPOLLIN;
-					ev.data.fd = *recvSocket;
-					if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, *recvSocket, &ev) == -1) {
-						perror("epoll_ctl() EPOLL_CTL_ADD");
-						status = CELIX_SERVICE_EXCEPTION;
-					}
-				#endif
-			}
+	}
 
-		}
+	if (status == CELIX_SUCCESS){
+		hashMap_put(ts->socketMap, strdup(pubURL), (void*)recvSocket);
+	}else{
+		free(recvSocket);
+	}
 
-		if (status == CELIX_SUCCESS){
-			hashMap_put(ts->socketMap, pubURL, (void*)recvSocket);
-		}else{
-			free(recvSocket);
-		}
+	celixThreadMutex_unlock(&ts->ts_lock);
 
-		celixThreadMutex_unlock(&ts->ts_lock);
+	return status;
+}
 
-    }
+celix_status_t pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) {
+	celix_status_t status = CELIX_SUCCESS;
+	char *url = strdup(pubURL);
+	celixThreadMutex_lock(&ts->pendingConnections_lock);
+	arrayList_add(ts->pendingConnections, url);
+	celixThreadMutex_unlock(&ts->pendingConnections_lock);
+	return status;
+}
 
-    return status;
+celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) {
+	celix_status_t status = CELIX_SUCCESS;
+	char *url = strdup(pubURL);
+	celixThreadMutex_lock(&ts->pendingDisconnections_lock);
+	arrayList_add(ts->pendingDisconnections, url);
+	celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
+	return status;
 }
 
 celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL){
-    printf("pubsub_topicSubscriptionDisconnectPublisher : pubURL = %s\n", pubURL);
-    celix_status_t status = CELIX_SUCCESS;
+	printf("pubsub_topicSubscriptionDisconnectPublisher : pubURL = %s\n", pubURL);
+	celix_status_t status = CELIX_SUCCESS;
+	struct epoll_event ev;
+	memset(&ev, 0, sizeof(ev));
 
-    if (hashMap_containsKey(ts->socketMap, pubURL)){
+	celixThreadMutex_lock(&ts->ts_lock);
+
+	if (hashMap_containsKey(ts->socketMap, pubURL)){
 
 #if defined(__APPLE__) && defined(__MACH__)
-    //TODO: Use kqueue for OSX
+		//TODO: Use kqueue for OSX
 #else
-		struct epoll_event ev;
-		memset(&ev, 0, sizeof(ev));
-
-		celixThreadMutex_lock(&ts->ts_lock);
-
 		int *s = hashMap_remove(ts->socketMap, pubURL);
 		if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_DEL, *s, &ev) == -1) {
 			printf("in if error()\n");
@@ -336,11 +365,11 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
 			status = CELIX_SERVICE_EXCEPTION;
 		}
 		free(s);
-
-		celixThreadMutex_unlock(&ts->ts_lock);
 #endif
 
-    }
+	}
+
+	celixThreadMutex_unlock(&ts->ts_lock);
 
 	return status;
 }
@@ -349,9 +378,7 @@ celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, p
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-
 	arrayList_add(ts->sub_ep_list,subEP);
-
 	celixThreadMutex_unlock(&ts->ts_lock);
 
 	return status;
@@ -362,9 +389,7 @@ celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt ts) {
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-
 	ts->nrSubscribers++;
-
 	celixThreadMutex_unlock(&ts->ts_lock);
 
 	return status;
@@ -374,22 +399,17 @@ celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-
 	arrayList_removeElement(ts->sub_ep_list,subEP);
-
 	celixThreadMutex_unlock(&ts->ts_lock);
 
 	return status;
-
 }
 
 celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt ts) {
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-
 	ts->nrSubscribers--;
-
 	celixThreadMutex_unlock(&ts->ts_lock);
 
 	return status;
@@ -399,153 +419,118 @@ unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
 	return ts->nrSubscribers;
 }
 
-celix_status_t pubsub_topicSubscriptionSetSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc) {
-	celix_status_t status = CELIX_SUCCESS;
-
-	celixThreadMutex_lock(&ts->ts_lock);
-    //clear old
-    if (ts->serializerSvc != NULL) {
-        hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap);
-        while (hashMapIterator_hasNext(&iter)) {
-            hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
-            pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry);
-            pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
-            ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
-            hashMap_put(ts->msgSerializerMapMap, subsvc, NULL);
-
-        }
-    }
-	ts->serializerSvc = serializerSvc;
-    //init new
-    if (ts->serializerSvc != NULL) {
-        hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap);
-        while (hashMapIterator_hasNext(&iter)) {
-            pubsub_subscriber_t* subsvc = hashMapIterator_nextKey(&iter);
-            bundle_pt bundle = hashMap_get(ts->bundleMap, subsvc);
-            pubsub_msg_serializer_map_t* map = NULL;
-            ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map);
-            hashMap_put(ts->msgSerializerMapMap, subsvc, map);
-        }
-    }
-	celixThreadMutex_unlock(&ts->ts_lock);
-
-	return status;
+array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt sub){
+	return sub->sub_ep_list;
 }
 
-celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){
-	celix_status_t status = CELIX_SUCCESS;
-
-	celixThreadMutex_lock(&ts->ts_lock);
-	if (ts->serializerSvc == serializerSvc) { //only act if svc removed is services used
-		hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap);
-		while (hashMapIterator_hasNext(&iter)) {
-            hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
-            pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry);
-            pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
-            ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
-            hashMap_put(ts->msgSerializerMapMap, subsvc, NULL);
-		}
-		ts->serializerSvc = NULL;
-	}
-	celixThreadMutex_unlock(&ts->ts_lock);
-
-	return status;
-}
 
-static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void* svc){
+static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service){
 	celix_status_t status = CELIX_SUCCESS;
 	topic_subscription_pt ts = handle;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-	if (!hashMap_containsKey(ts->msgSerializerMapMap, svc)) {
+	if (!hashMap_containsKey(ts->servicesMap, service)) {
 		bundle_pt bundle = NULL;
+		hash_map_pt msgTypes = NULL;
+
 		serviceReference_getBundle(reference, &bundle);
 
-		if (ts->serializerSvc != NULL) {
-            pubsub_msg_serializer_map_t* map = NULL;
-            ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map);
-            if (map != NULL) {
-                hashMap_put(ts->msgSerializerMapMap, svc, map);
-                hashMap_put(ts->bundleMap, svc, bundle);
-            }
+		if(ts->serializer != NULL && bundle!=NULL){
+			ts->serializer->createSerializerMap(ts->serializer->handle,bundle,&msgTypes);
+			if(msgTypes != NULL){
+				hashMap_put(ts->servicesMap, service, msgTypes);
+				printf("PSA_UDP_MC_TS: New subscriber registered.\n");
+			}
+		}
+		else{
+			printf("PSA_UDP_MC_TS: Cannot register new subscriber.\n");
+			status = CELIX_SERVICE_EXCEPTION;
 		}
 	}
 	celixThreadMutex_unlock(&ts->ts_lock);
-	printf("TS: New subscriber registered.\n");
+
 	return status;
 
 }
 
-static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void* svc){
+static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service){
 	celix_status_t status = CELIX_SUCCESS;
 	topic_subscription_pt ts = handle;
 
-
-    celixThreadMutex_lock(&ts->ts_lock);
-	if (hashMap_containsKey(ts->msgSerializerMapMap, svc)) {
-		pubsub_msg_serializer_map_t* map = hashMap_remove(ts->msgSerializerMapMap, svc);
-		if (ts->serializerSvc != NULL){
-			ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
-            hashMap_remove(ts->bundleMap, svc);
-            hashMap_remove(ts->msgSerializerMapMap, svc);
+	celixThreadMutex_lock(&ts->ts_lock);
+	if (hashMap_containsKey(ts->servicesMap, service)) {
+		hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service);
+		if(msgTypes!=NULL && ts->serializer!=NULL){
+			ts->serializer->destroySerializerMap(ts->serializer->handle,msgTypes);
+			printf("PSA_ZMQ_TS: Subscriber unregistered.\n");
+		}
+		else{
+			printf("PSA_ZMQ_TS: Cannot unregister subscriber.\n");
+			status = CELIX_SERVICE_EXCEPTION;
 		}
 	}
 	celixThreadMutex_unlock(&ts->ts_lock);
 
-	printf("TS: Subscriber unregistered.\n");
+	printf("PSA_UDP_MC_TS: Subscriber unregistered.\n");
 	return status;
 }
 
 
-static void process_msg(topic_subscription_pt sub, pubsub_udp_msg_t* msg){
+static void process_msg(topic_subscription_pt sub,pubsub_udp_msg_t *msg){
 
-	hash_map_iterator_t iter = hashMapIterator_construct(sub->msgSerializerMapMap);
 	celixThreadMutex_lock(&sub->ts_lock);
-	while (hashMapIterator_hasNext(&iter)) {
-		hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+	hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap);
+	while (hashMapIterator_hasNext(iter)) {
+		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
 		pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry);
-		pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
-
-		pubsub_msg_serializer_t* msgSer = hashMap_get(map->serializers, (void *)(uintptr_t )msg->header.type);
+		hash_map_pt msgTypes = hashMapEntry_getValue(entry);
 
+		pubsub_msg_serializer_t *msgSer = hashMap_get(msgTypes,(void*)(uintptr_t )msg->header.type);
 		if (msgSer == NULL) {
-			printf("TS: Primary message %d not supported. NOT receiving any part of the whole message.\n",msg->header.type);
-		} else {
+			printf("PSA_UDP_MC_TS: Serializer not available for message %d.\n",msg->header.type);
+		}
+		else{
 			void *msgInst = NULL;
-			bool validVersion = checkVersion(msgSer->msgVersion, &msg->header);
+			bool validVersion = checkVersion(msgSer->msgVersion,&msg->header);
+
 			if(validVersion){
-                celix_status_t status = msgSer->deserialize(msgSer->handle, msg->payload, 0, &msgInst);
+
+				celix_status_t status = msgSer->deserialize(msgSer, (const void *) msg->payload, 0, &msgInst);
+
 				if (status == CELIX_SUCCESS) {
 					bool release = true;
 					pubsub_multipart_callbacks_t mp_callbacks;
-					mp_callbacks.handle = map;
+					mp_callbacks.handle = sub;
 					mp_callbacks.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForMsgType;
 					mp_callbacks.getMultipart = NULL;
 
 					subsvc->receive(subsvc->handle, msgSer->msgName, msg->header.type, msgInst, &mp_callbacks, &release);
-					if (release) {
-                        msgSer->freeMsg(msgSer->handle, msgInst);
+
+					if(release){
+						msgSer->freeMsg(msgSer,msgInst);
 					}
 				}
 				else{
-					printf("TS: Cannot deserialize msgType %s.\n", msgSer->msgName);
+					printf("PSA_UDP_MC_TS: Cannot deserialize msgType %s.\n",msgSer->msgName);
 				}
 
 			}
-			else {
+			else{
 				int major=0,minor=0;
-				version_getMajor(msgSer->msgVersion, &major);
-				version_getMinor(msgSer->msgVersion, &minor);
-				printf("TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",
-                       msgSer->msgName, major, minor, msg->header.major, msg->header.minor);
+				version_getMajor(msgSer->msgVersion,&major);
+				version_getMinor(msgSer->msgVersion,&minor);
+				printf("PSA_UDP_MC_TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",
+						msgSer->msgName,major,minor,msg->header.major,msg->header.minor);
 			}
+
 		}
 	}
+	hashMapIterator_destroy(iter);
 	celixThreadMutex_unlock(&sub->ts_lock);
 }
 
 static void* udp_recv_thread_func(void * arg) {
-    topic_subscription_pt sub = (topic_subscription_pt) arg;
+	topic_subscription_pt sub = (topic_subscription_pt) arg;
 
 #if defined(__APPLE__) && defined(__MACH__)
     //TODO: use kqueue for OSX
@@ -558,52 +543,68 @@ static void* udp_recv_thread_func(void * arg) {
 		}
     }
 #else
+	struct epoll_event events[MAX_EPOLL_EVENTS];
+
+	while (sub->running) {
+		int nfds = epoll_wait(sub->topicEpollFd, events, MAX_EPOLL_EVENTS, RECV_THREAD_TIMEOUT * 1000);
+		int i;
+		for(i = 0; i < nfds; i++ ) {
+			unsigned int index;
+			unsigned int size;
+			if(largeUdp_dataAvailable(sub->largeUdpHandle, events[i].data.fd, &index, &size) == true) {
+				// Handle data
+				pubsub_udp_msg_t *udpMsg = NULL;
+				if(largeUdp_read(sub->largeUdpHandle, index, (void**)&udpMsg, size) != 0) {
+					printf("PSA_UDP_MC_TS: ERROR largeUdp_read with index %d\n", index);
+					continue;
+				}
 
-    struct epoll_event events[MAX_EPOLL_EVENTS];
+				process_msg(sub, udpMsg);
 
-    while (sub->running) {
-        int nfds = epoll_wait(sub->topicEpollFd, events, MAX_EPOLL_EVENTS, RECV_THREAD_TIMEOUT * 1000);
-        int i;
-        for(i = 0; i < nfds; i++ ) {
-            unsigned int index;
-            unsigned int size;
-            if(largeUdp_dataAvailable(sub->largeUdpHandle, events[i].data.fd, &index, &size) == true) {
-                // Handle data
-                pubsub_udp_msg_t* udpMsg = NULL;
-                if(largeUdp_read(sub->largeUdpHandle, index, (void**)&udpMsg, size) != 0) {
-                	printf("TS: ERROR largeUdp_read with index %d\n", index);
-                	continue;
-                }
-
-                if (udpMsg->header.type == 0){
-                	//Raw msg, since raw messages are not supported, don't do anything.
-                }else{
-                	process_msg(sub, udpMsg);
-                }
-
-                free(udpMsg);
-            }
-        }
-    }
+				free(udpMsg);
+			}
+		}
+		connectPendingPublishers(sub);
+		disconnectPendingPublishers(sub);
+	}
 #endif
 
-    return NULL;
+	return NULL;
 }
 
+static void connectPendingPublishers(topic_subscription_pt sub) {
+	celixThreadMutex_lock(&sub->pendingConnections_lock);
+	while(!arrayList_isEmpty(sub->pendingConnections)) {
+		char * pubEP = arrayList_remove(sub->pendingConnections, 0);
+		pubsub_topicSubscriptionConnectPublisher(sub, pubEP);
+		free(pubEP);
+	}
+	celixThreadMutex_unlock(&sub->pendingConnections_lock);
+}
 
-static void sigusr1_sighandler(int signo) {
-	printf("TS: Topic subscription being shut down...\n");
+static void disconnectPendingPublishers(topic_subscription_pt sub) {
+	celixThreadMutex_lock(&sub->pendingDisconnections_lock);
+	while(!arrayList_isEmpty(sub->pendingDisconnections)) {
+		char * pubEP = arrayList_remove(sub->pendingDisconnections, 0);
+		pubsub_topicSubscriptionDisconnectPublisher(sub, pubEP);
+		free(pubEP);
+	}
+	celixThreadMutex_unlock(&sub->pendingDisconnections_lock);
+}
+
+static void sigusr1_sighandler(int signo){
+	printf("PSA_UDP_MC_TS: Topic subscription being shut down...\n");
 	return;
 }
 
-static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr) {
+static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){
 	bool check=false;
 	int major=0,minor=0;
 
-	if (msgVersion!=NULL) {
+	if(msgVersion!=NULL){
 		version_getMajor(msgVersion,&major);
 		version_getMinor(msgVersion,&minor);
-		if (hdr->major==((unsigned char)major)) { /* Different major means incompatible */
+		if(hdr->major==((unsigned char)major)){ /* Different major means incompatible */
 			check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
 		}
 	}
@@ -611,24 +612,7 @@ static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr) {
 	return check;
 }
 
-static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* out) {
-    pubsub_msg_serializer_map_t* map = handle;
-    hash_map_iterator_t iter = hashMapIterator_construct(map->serializers);
-    unsigned int msgTypeId = 0;
-    while (hashMapIterator_hasNext(&iter)) {
-        pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter);
-        if (strncmp(msgSer->msgName, msgType, 1024 * 1024) == 0) {
-            msgTypeId = msgSer->msgId;
-            break;
-        }
-    }
-
-    if (msgTypeId == 0) {
-        printf("Cannot find msg type id for msgType %s\n", msgType);
-        return -1;
-    } else {
-        *out = msgTypeId;
-        return 0;
-    }
+static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId){
+	*msgTypeId = utils_stringHash(msgType);
+	return 0;
 }
-

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/CMakeLists.txt b/pubsub/pubsub_admin_zmq/CMakeLists.txt
index 49eba87..8c3c727 100644
--- a/pubsub/pubsub_admin_zmq/CMakeLists.txt
+++ b/pubsub/pubsub_admin_zmq/CMakeLists.txt
@@ -53,10 +53,11 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 	    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
 	    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
 	    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
+    	   ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_admin_match.c
 	)
 
 	set_target_properties(org.apache.celix.pubsub_admin.PubSubAdminZmq PROPERTIES INSTALL_RPATH "$ORIGIN")
-	target_link_libraries(org.apache.celix.pubsub_admin.PubSubAdminZmq celix_framework celix_utils celix_dfi ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} ${JANSSON_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY})
+	target_link_libraries(org.apache.celix.pubsub_admin.PubSubAdminZmq celix_framework celix_utils celix_dfi ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY})
 	install_bundle(org.apache.celix.pubsub_admin.PubSubAdminZmq)
 
 endif()

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
index 7e7ac42..3a39a93 100644
--- a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
+++ b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
@@ -24,8 +24,8 @@
  *  \copyright	Apache License, Version 2.0
  */
 
-#ifndef PUBSUB_ADMIN_IMPL_H_
-#define PUBSUB_ADMIN_IMPL_H_
+#ifndef PUBSUB_ADMIN_ZMQ_IMPL_H_
+#define PUBSUB_ADMIN_ZMQ_IMPL_H_
 
 #include <czmq.h>
 /* The following undefs prevent the collision between:
@@ -38,7 +38,7 @@
 #undef LOG_WARNING
 
 #include "pubsub_admin.h"
-#include "pubsub_serializer.h"
+#include "pubsub_admin_match.h"
 #include "log_helper.h"
 
 #define PSA_ZMQ_BASE_PORT "PSA_ZMQ_BASE_PORT"
@@ -47,13 +47,17 @@
 #define PSA_ZMQ_DEFAULT_BASE_PORT 5501
 #define PSA_ZMQ_DEFAULT_MAX_PORT 6000
 
-struct pubsub_admin {
+#define PUBSUB_ADMIN_TYPE	"zmq"
 
-	pubsub_serializer_service_t* serializerSvc;
+struct pubsub_admin {
 
 	bundle_context_pt bundle_context;
 	log_helper_pt loghelper;
 
+	/* List of the available serializers */
+	celix_thread_mutex_t serializerListLock; // List<serializers>
+	array_list_pt serializerList;
+
 	celix_thread_mutex_t localPublicationsLock;
 	hash_map_pt localPublications;//<topic(string),service_factory_pt>
 
@@ -64,9 +68,17 @@ struct pubsub_admin {
 	hash_map_pt subscriptions; //<topic(string),topic_subscription>
 
 	celix_thread_mutex_t pendingSubscriptionsLock;
-	celix_thread_mutexattr_t pendingSubscriptionsAttr;
 	hash_map_pt pendingSubscriptions; //<topic(string),List<pubsub_ep>>
 
+	/* Those are used to keep track of valid subscriptions/publications that still have no valid serializer */
+	celix_thread_mutex_t noSerializerPendingsLock;
+	array_list_pt noSerializerSubscriptions; // List<pubsub_ep>
+	array_list_pt noSerializerPublications; // List<pubsub_ep>
+
+	celix_thread_mutex_t usedSerializersLock;
+	hash_map_pt topicSubscriptionsPerSerializer; // <serializer,List<topicSubscription>>
+	hash_map_pt topicPublicationsPerSerializer; // <serializer,List<topicPublications>>
+
 	char* ipAddress;
 
 	zactor_t* zmq_auth;
@@ -75,11 +87,6 @@ struct pubsub_admin {
     unsigned int maxPort;
 };
 
-/* Note: correct locking order is
- * 1. subscriptionsLock
- * 2. publications locks
- */
-
 celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin);
 celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin);
 
@@ -92,10 +99,9 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
 celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char* scope, char* topic);
 celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope,char* topic);
 
-celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score);
-celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score);
+celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt reference, void * service);
+celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service);
 
-celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc);
-celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score);
 
-#endif /* PUBSUB_ADMIN_IMPL_H_ */
+#endif /* PUBSUB_ADMIN_ZMQ_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h b/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
deleted file mode 100644
index dbd2ff1..0000000
--- a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_publish_service_private.h
- *
- *  \date       Sep 24, 2015
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#ifndef PUBSUB_PUBLISH_SERVICE_PRIVATE_H_
-#define PUBSUB_PUBLISH_SERVICE_PRIVATE_H_
-
-#include "publisher.h"
-#include "pubsub_endpoint.h"
-#include "pubsub_common.h"
-#include "pubsub_serializer.h"
-
-typedef struct topic_publication *topic_publication_pt;
-
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out);
-celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
-
-celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
-celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
-
-celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
-celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc);
-
-celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
-celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);
-
-array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub);
-
-#endif /* PUBSUB_PUBLISH_SERVICE_PRIVATE_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/private/include/topic_publication.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/topic_publication.h b/pubsub/pubsub_admin_zmq/private/include/topic_publication.h
new file mode 100644
index 0000000..3457263
--- /dev/null
+++ b/pubsub/pubsub_admin_zmq/private/include/topic_publication.h
@@ -0,0 +1,49 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * topic_publication.h
+ *
+ *  \date       Sep 24, 2015
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef TOPIC_PUBLICATION_H_
+#define TOPIC_PUBLICATION_H_
+
+#include "publisher.h"
+#include "pubsub_endpoint.h"
+#include "pubsub_common.h"
+
+#include "pubsub_serializer.h"
+
+typedef struct topic_publication *topic_publication_pt;
+
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
+
+celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
+celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
+
+celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
+celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);
+
+array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub);
+
+#endif /* TOPIC_PUBLICATION_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
index c1e78c3..7267103 100644
--- a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
+++ b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
@@ -38,22 +38,21 @@
 
 typedef struct topic_subscription* topic_subscription_pt;
 
-celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_t* serializer, char* scope, char* topic,topic_subscription_pt* out);
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,char* scope, char* topic, pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out);
 celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
 celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
 celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);
 
 celix_status_t pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL);
 celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL);
+
 celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL);
 celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL);
 
 celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
 celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP);
 
-celix_status_t pubsub_topicSubscriptionSetSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc);
-celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc);
-
+array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt sub);
 celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription);
 celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt subscription);
 unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt subscription);

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/private/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/psa_activator.c b/pubsub/pubsub_admin_zmq/private/src/psa_activator.c
index cfe2c2e..fd07310 100644
--- a/pubsub/pubsub_admin_zmq/private/src/psa_activator.c
+++ b/pubsub/pubsub_admin_zmq/private/src/psa_activator.c
@@ -28,6 +28,7 @@
 
 #include "bundle_activator.h"
 #include "service_registration.h"
+#include "service_tracker.h"
 
 #include "pubsub_admin_impl.h"
 
@@ -36,6 +37,7 @@ struct activator {
 	pubsub_admin_pt admin;
 	pubsub_admin_service_pt adminService;
 	service_registration_pt registration;
+	service_tracker_pt serializerTracker;
 };
 
 celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
@@ -48,7 +50,28 @@ celix_status_t bundleActivator_create(bundle_context_pt context, void **userData
 	}
 	else{
 		*userData = activator;
+
 		status = pubsubAdmin_create(context, &(activator->admin));
+
+		if(status == CELIX_SUCCESS){
+			service_tracker_customizer_pt customizer = NULL;
+			status = serviceTrackerCustomizer_create(activator->admin,
+					NULL,
+					pubsubAdmin_serializerAdded,
+					NULL,
+					pubsubAdmin_serializerRemoved,
+					&customizer);
+			if(status == CELIX_SUCCESS){
+				status = serviceTracker_create(context, PUBSUB_SERIALIZER_SERVICE, customizer, &(activator->serializerTracker));
+				if(status != CELIX_SUCCESS){
+					serviceTrackerCustomizer_destroy(customizer);
+					pubsubAdmin_destroy(activator->admin);
+				}
+			}
+			else{
+				pubsubAdmin_destroy(activator->admin);
+			}
+		}
 	}
 
 	return status;
@@ -74,16 +97,14 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt context)
 		pubsubAdminSvc->closeAllPublications = pubsubAdmin_closeAllPublications;
 		pubsubAdminSvc->closeAllSubscriptions = pubsubAdmin_closeAllSubscriptions;
 
-		pubsubAdminSvc->matchPublisher = pubsubAdmin_matchPublisher;
-		pubsubAdminSvc->matchSubscriber = pubsubAdmin_matchSubscriber;
-
-		pubsubAdminSvc->setSerializer = pubsubAdmin_setSerializer;
-		pubsubAdminSvc->removeSerializer = pubsubAdmin_removeSerializer;
+		pubsubAdminSvc->matchEndpoint = pubsubAdmin_matchEndpoint;
 
 		activator->adminService = pubsubAdminSvc;
 
 		status = bundleContext_registerService(context, PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration);
 
+		status += serviceTracker_open(activator->serializerTracker);
+
 	}
 
 
@@ -94,7 +115,9 @@ celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context)
 	celix_status_t status = CELIX_SUCCESS;
 	struct activator *activator = userData;
 
-	serviceRegistration_unregister(activator->registration);
+	status += serviceTracker_close(activator->serializerTracker);
+	status += serviceRegistration_unregister(activator->registration);
+
 	activator->registration = NULL;
 
 	free(activator->adminService);
@@ -107,6 +130,7 @@ celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt contex
 	celix_status_t status = CELIX_SUCCESS;
 	struct activator *activator = userData;
 
+	serviceTracker_destroy(activator->serializerTracker);
 	pubsubAdmin_destroy(activator->admin);
 	activator->admin = NULL;
 


Mime
View raw message