celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From griccia...@apache.org
Subject [02/11] celix git commit: Refactored serializers management
Date Fri, 29 Sep 2017 13:34:18 GMT
http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
index 409c7a5..cf51ed9 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
@@ -49,51 +49,49 @@
 #include "pubsub_utils.h"
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
-	#include "zmq_crypto.h"
+#include "zmq_crypto.h"
 
-	#define MAX_CERT_PATH_LENGTH 512
+#define MAX_CERT_PATH_LENGTH 512
 #endif
 
 #define POLL_TIMEOUT  	250
 #define ZMQ_POLL_TIMEOUT_MS_ENV 	"ZMQ_POLL_TIMEOUT_MS"
 
-struct topic_subscription {
+struct topic_subscription{
+
 	zsock_t* zmq_socket;
 	zcert_t * zmq_cert;
 	zcert_t * zmq_pub_cert;
-	pthread_mutex_t socket_lock; //Protects zmq_socket access
+	pthread_mutex_t socket_lock;
 	service_tracker_pt tracker;
 	array_list_pt sub_ep_list;
 	celix_thread_t recv_thread;
 	bool running;
-	celix_thread_mutex_t ts_lock; //Protects topic_subscription data structure access
+	celix_thread_mutex_t ts_lock;
 	bundle_context_pt context;
 
-	hash_map_pt msgSerializerMapMap; // key = service ptr, value = pubsub_msg_serializer_map_t*
-    hash_map_pt bundleMap; //key = service ptr, value = bundle_pt
-	array_list_pt pendingConnections;
-	array_list_pt pendingDisconnections;
+	pubsub_serializer_service_t *serializer;
+
+	hash_map_pt servicesMap; // key = service, value = msg types map
 
 	celix_thread_mutex_t pendingConnections_lock;
+	array_list_pt pendingConnections;
+
+	array_list_pt pendingDisconnections;
 	celix_thread_mutex_t pendingDisconnections_lock;
+
 	unsigned int nrSubscribers;
-	pubsub_serializer_service_t* serializerSvc;
 };
 
-/* Note: correct locking order is
- * 1. socket_lock
- * 2. ts_lock
- */
-
-typedef struct complete_zmq_msg {
+typedef struct complete_zmq_msg{
 	zframe_t* header;
 	zframe_t* payload;
 }* complete_zmq_msg_pt;
 
-typedef struct mp_handle {
-	pubsub_msg_serializer_map_t* map;
+typedef struct mp_handle{
+	hash_map_pt svc_msg_db;
 	hash_map_pt rcv_msg_map;
-} mp_handle_t;
+}* mp_handle_pt;
 
 typedef struct msg_map_entry{
 	bool retain;
@@ -107,100 +105,66 @@ static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr);
 static void sigusr1_sighandler(int signo);
 static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId);
 static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain, void **part);
-static mp_handle_t* create_mp_handle(topic_subscription_pt sub, pubsub_msg_serializer_map_t* map, array_list_pt rcv_msg_list);
-static void destroy_mp_handle(mp_handle_t* mp_handle);
+static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt rcv_msg_list);
+static void destroy_mp_handle(mp_handle_pt mp_handle);
 static void connectPendingPublishers(topic_subscription_pt sub);
 static void disconnectPendingPublishers(topic_subscription_pt sub);
 
-celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_t* serializer, char* scope, char* topic,topic_subscription_pt* out){
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, char* scope, char* topic, pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out){
 	celix_status_t status = CELIX_SUCCESS;
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
-	if(strcmp(topic,PUBSUB_ANY_SUB_TOPIC) != 0){
-		char* secure_topics = NULL;
-		bundleContext_getProperty(bundle_context, "SECURE_TOPICS", (const char **) &secure_topics);
-
-		if (secure_topics){
-			array_list_pt secure_topics_list = pubsub_getTopicsFromString(secure_topics);
-
-			int i;
-			int secure_topics_size = arrayList_size(secure_topics_list);
-			for (i = 0; i < secure_topics_size; i++){
-				char* top = arrayList_get(secure_topics_list, i);
-				if (strcmp(topic, top) == 0){
-					printf("TS: Secure topic: '%s'\n", top);
-					subEP->is_secure = true;
-				}
-				free(top);
-				top = NULL;
-			}
-
-			arrayList_destroy(secure_topics_list);
-		}
+	char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
+	if (keys_bundle_dir == NULL){
+		return CELIX_SERVICE_EXCEPTION;
 	}
 
-	zcert_t* sub_cert = NULL;
-	zcert_t* pub_cert = NULL;
-	const char* pub_key = NULL;
-	if (subEP->is_secure){
-		char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
-		if (keys_bundle_dir == NULL){
-			return CELIX_SERVICE_EXCEPTION;
-		}
-
-		const char* keys_file_path = NULL;
-		const char* keys_file_name = NULL;
-		bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, &keys_file_path);
-		bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, &keys_file_name);
+	const char* keys_file_path = NULL;
+	const char* keys_file_name = NULL;
+	bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, &keys_file_path);
+	bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, &keys_file_name);
 
-		char sub_cert_path[MAX_CERT_PATH_LENGTH];
-		char pub_cert_path[MAX_CERT_PATH_LENGTH];
+	char sub_cert_path[MAX_CERT_PATH_LENGTH];
+	char pub_cert_path[MAX_CERT_PATH_LENGTH];
 
-		//certificate path ".cache/bundle{id}/version0.0/./META-INF/keys/subscriber/private/sub_{topic}.key.enc"
-		snprintf(sub_cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/subscriber/private/sub_%s.key.enc", keys_bundle_dir, topic);
-		snprintf(pub_cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/publisher/public/pub_%s.pub", keys_bundle_dir, topic);
-		free(keys_bundle_dir);
+	//certificate path ".cache/bundle{id}/version0.0/./META-INF/keys/subscriber/private/sub_{topic}.key.enc"
+	snprintf(sub_cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/subscriber/private/sub_%s.key.enc", keys_bundle_dir, topic);
+	snprintf(pub_cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/publisher/public/pub_%s.pub", keys_bundle_dir, topic);
+	free(keys_bundle_dir);
 
-		printf("TS: Loading subscriber key '%s'\n", sub_cert_path);
-		printf("TS: Loading publisher key '%s'\n", pub_cert_path);
-
-		sub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, sub_cert_path);
-		if (sub_cert == NULL){
-			printf("TS: Cannot load key '%s'\n", sub_cert_path);
-			printf("TS: Topic '%s' NOT SECURED !\n", topic);
-			subEP->is_secure = false;
-		}
+	printf("PSA_ZMQ_PSA_ZMQ_TS: Loading subscriber key '%s'\n", sub_cert_path);
+	printf("PSA_ZMQ_PSA_ZMQ_TS: Loading publisher key '%s'\n", pub_cert_path);
 
-		pub_cert = zcert_load(pub_cert_path);
-		if (sub_cert != NULL && pub_cert == NULL){
-			zcert_destroy(&sub_cert);
-			printf("TS: Cannot load key '%s'\n", pub_cert_path);
-			printf("TS: Topic '%s' NOT SECURED !\n", topic);
-			subEP->is_secure = false;
-		}
+	zcert_t* sub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, sub_cert_path);
+	if (sub_cert == NULL){
+		printf("PSA_ZMQ_PSA_ZMQ_TS: Cannot load key '%s'\n", sub_cert_path);
+		return CELIX_SERVICE_EXCEPTION;
+	}
 
-		pub_key = zcert_public_txt(pub_cert);
+	zcert_t* pub_cert = zcert_load(pub_cert_path);
+	if (pub_cert == NULL){
+		zcert_destroy(&sub_cert);
+		printf("PSA_ZMQ_PSA_ZMQ_TS: Cannot load key '%s'\n", pub_cert_path);
+		return CELIX_SERVICE_EXCEPTION;
 	}
+
+	const char* pub_key = zcert_public_txt(pub_cert);
 #endif
 
 	zsock_t* zmq_s = zsock_new (ZMQ_SUB);
 	if(zmq_s==NULL){
-		#ifdef BUILD_WITH_ZMQ_SECURITY
-		if (subEP->is_secure){
-			zcert_destroy(&sub_cert);
-			zcert_destroy(&pub_cert);
-		}
-		#endif
+#ifdef BUILD_WITH_ZMQ_SECURITY
+		zcert_destroy(&sub_cert);
+		zcert_destroy(&pub_cert);
+#endif
 
 		return CELIX_SERVICE_EXCEPTION;
 	}
 
-	#ifdef BUILD_WITH_ZMQ_SECURITY
-	if (subEP->is_secure){
-		zcert_apply (sub_cert, zmq_s);
-		zsock_set_curve_serverkey (zmq_s, pub_key); //apply key of publisher to socket of subscriber
-	}
-	#endif
+#ifdef BUILD_WITH_ZMQ_SECURITY
+	zcert_apply (sub_cert, zmq_s);
+	zsock_set_curve_serverkey (zmq_s, pub_key); //apply key of publisher to socket of subscriber
+#endif
 
 	if(strcmp(topic,PUBSUB_ANY_SUB_TOPIC)==0){
 		zsock_set_subscribe (zmq_s, "");
@@ -214,20 +178,18 @@ celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,
 	ts->zmq_socket = zmq_s;
 	ts->running = false;
 	ts->nrSubscribers = 0;
-	ts->serializerSvc = NULL;
+	ts->serializer = best_serializer;
 
-	#ifdef BUILD_WITH_ZMQ_SECURITY
-	if (subEP->is_secure){
-		ts->zmq_cert = sub_cert;
-		ts->zmq_pub_cert = pub_cert;
-	}
-	#endif
+#ifdef BUILD_WITH_ZMQ_SECURITY
+	ts->zmq_cert = sub_cert;
+	ts->zmq_pub_cert = pub_cert;
+#endif
 
 	celixThreadMutex_create(&ts->socket_lock, NULL);
 	celixThreadMutex_create(&ts->ts_lock,NULL);
 	arrayList_create(&ts->sub_ep_list);
-	ts->msgSerializerMapMap = hashMap_create(NULL, NULL, NULL, NULL);
-    ts->bundleMap = hashMap_create(NULL, NULL, NULL, NULL);
+	ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL);
+
 	arrayList_create(&ts->pendingConnections);
 	arrayList_create(&ts->pendingDisconnections);
 	celixThreadMutex_create(&ts->pendingConnections_lock, NULL);
@@ -236,17 +198,17 @@ celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,
 	char filter[128];
 	memset(filter,0,128);
 	if(strncmp(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT,scope,strlen(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT)) == 0) {
-	        // default scope, means that subscriber has not defined a scope property
-	        snprintf(filter, 128, "(&(%s=%s)(%s=%s))",
-	                (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME,
-	                PUBSUB_SUBSCRIBER_TOPIC,topic);
+		// default scope, means that subscriber has not defined a scope property
+		snprintf(filter, 128, "(&(%s=%s)(%s=%s))",
+				(char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME,
+				PUBSUB_SUBSCRIBER_TOPIC,topic);
 
 	} else {
-        snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))",
-                (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME,
-                PUBSUB_SUBSCRIBER_TOPIC,topic,
-                PUBSUB_SUBSCRIBER_SCOPE,scope);
-    }
+		snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))",
+				(char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME,
+				PUBSUB_SUBSCRIBER_TOPIC,topic,
+				PUBSUB_SUBSCRIBER_SCOPE,scope);
+	}
 	service_tracker_customizer_pt customizer = NULL;
 	status += serviceTrackerCustomizer_create(ts,NULL,topicsub_subscriberTracked,NULL,topicsub_subscriberUntracked,&customizer);
 	status += serviceTracker_createWithFilter(bundle_context, filter, customizer, &ts->tracker);
@@ -259,10 +221,7 @@ celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,
 
 	sigaction(SIGUSR1,&actions,NULL);
 
-	if (status == CELIX_SUCCESS) {
-		*out=ts;
-		pubsub_topicSubscriptionSetSerializer(ts, serializer);
-	}
+	*out=ts;
 
 	return status;
 }
@@ -276,8 +235,8 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
 	serviceTracker_destroy(ts->tracker);
 	arrayList_clear(ts->sub_ep_list);
 	arrayList_destroy(ts->sub_ep_list);
-	hashMap_destroy(ts->msgSerializerMapMap,false,false);
-    hashMap_destroy(ts->bundleMap,false,false);
+	/* TODO: Destroy all the serializer maps? */
+	hashMap_destroy(ts->servicesMap,false,false);
 
 	celixThreadMutex_lock(&ts->pendingConnections_lock);
 	arrayList_destroy(ts->pendingConnections);
@@ -289,18 +248,17 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
 	celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
 	celixThreadMutex_destroy(&ts->pendingDisconnections_lock);
 
-	#ifdef BUILD_WITH_ZMQ_SECURITY
-	zcert_destroy(&(ts->zmq_cert));
-	zcert_destroy(&(ts->zmq_pub_cert));
-	#endif
-
-	celixThreadMutex_unlock(&ts->ts_lock);
-
 	celixThreadMutex_lock(&ts->socket_lock);
 	zsock_destroy(&(ts->zmq_socket));
+#ifdef BUILD_WITH_ZMQ_SECURITY
+	zcert_destroy(&(ts->zmq_cert));
+	zcert_destroy(&(ts->zmq_pub_cert));
+#endif
 	celixThreadMutex_unlock(&ts->socket_lock);
 	celixThreadMutex_destroy(&ts->socket_lock);
 
+	celixThreadMutex_unlock(&ts->ts_lock);
+
 
 	free(ts);
 
@@ -310,8 +268,6 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
 celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts){
 	celix_status_t status = CELIX_SUCCESS;
 
-	//celixThreadMutex_lock(&ts->ts_lock);
-
 	status = serviceTracker_open(ts->tracker);
 
 	ts->running = true;
@@ -320,16 +276,12 @@ celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts){
 		status=celixThread_create(&ts->recv_thread,NULL,zmq_recv_thread_func,ts);
 	}
 
-	//celixThreadMutex_unlock(&ts->ts_lock);
-
 	return status;
 }
 
 celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){
 	celix_status_t status = CELIX_SUCCESS;
 
-	//celixThreadMutex_lock(&ts->ts_lock);
-
 	ts->running = false;
 
 	pthread_kill(ts->recv_thread.thread,SIGUSR1);
@@ -338,15 +290,13 @@ celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){
 
 	status = serviceTracker_close(ts->tracker);
 
-	//celixThreadMutex_unlock(&ts->ts_lock);
-
 	return status;
 }
 
 celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL){
 	celix_status_t status = CELIX_SUCCESS;
 	celixThreadMutex_lock(&ts->socket_lock);
-	if(!zsock_is(ts->zmq_socket) || zsock_connect(ts->zmq_socket, "%s", pubURL) != 0){
+	if(!zsock_is(ts->zmq_socket) || zsock_connect(ts->zmq_socket,"%s",pubURL) != 0){
 		status = CELIX_SERVICE_EXCEPTION;
 	}
 	celixThreadMutex_unlock(&ts->socket_lock);
@@ -355,28 +305,28 @@ celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts
 }
 
 celix_status_t pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) {
-    celix_status_t status = CELIX_SUCCESS;
-    char *url = strdup(pubURL);
-    celixThreadMutex_lock(&ts->pendingConnections_lock);
-    arrayList_add(ts->pendingConnections, url);
-    celixThreadMutex_unlock(&ts->pendingConnections_lock);
-    return status;
+	celix_status_t status = CELIX_SUCCESS;
+	char *url = strdup(pubURL);
+	celixThreadMutex_lock(&ts->pendingConnections_lock);
+	arrayList_add(ts->pendingConnections, url);
+	celixThreadMutex_unlock(&ts->pendingConnections_lock);
+	return status;
 }
 
 celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) {
-     celix_status_t status = CELIX_SUCCESS;
-    char *url = strdup(pubURL);
-    celixThreadMutex_lock(&ts->pendingDisconnections_lock);
-    arrayList_add(ts->pendingDisconnections, url);
-    celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
-    return status;
+	celix_status_t status = CELIX_SUCCESS;
+	char *url = strdup(pubURL);
+	celixThreadMutex_lock(&ts->pendingDisconnections_lock);
+	arrayList_add(ts->pendingDisconnections, url);
+	celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
+	return status;
 }
 
 celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL){
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&ts->socket_lock);
-	if(!zsock_is(ts->zmq_socket) || zsock_disconnect(ts->zmq_socket, "%s", pubURL) != 0){
+	if(!zsock_is(ts->zmq_socket) || zsock_disconnect(ts->zmq_socket,"%s",pubURL) != 0){
 		status = CELIX_SERVICE_EXCEPTION;
 	}
 	celixThreadMutex_unlock(&ts->socket_lock);
@@ -388,9 +338,7 @@ celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, p
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-
 	arrayList_add(ts->sub_ep_list,subEP);
-
 	celixThreadMutex_unlock(&ts->ts_lock);
 
 	return status;
@@ -401,9 +349,7 @@ celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt ts) {
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-
 	ts->nrSubscribers++;
-
 	celixThreadMutex_unlock(&ts->ts_lock);
 
 	return status;
@@ -413,22 +359,17 @@ celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-
 	arrayList_removeElement(ts->sub_ep_list,subEP);
-
 	celixThreadMutex_unlock(&ts->ts_lock);
 
 	return status;
-
 }
 
 celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt ts) {
 	celix_status_t status = CELIX_SUCCESS;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-
 	ts->nrSubscribers--;
-
 	celixThreadMutex_unlock(&ts->ts_lock);
 
 	return status;
@@ -438,152 +379,114 @@ unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
 	return ts->nrSubscribers;
 }
 
-celix_status_t pubsub_topicSubscriptionSetSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc) {
-	celix_status_t status = CELIX_SUCCESS;
-
-	celixThreadMutex_lock(&ts->ts_lock);
-    //clear old
-    if (ts->serializerSvc != NULL) {
-        hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap);
-        while (hashMapIterator_hasNext(&iter)) {
-            hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
-            pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry);
-            pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
-            ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
-            hashMap_put(ts->msgSerializerMapMap, subsvc, NULL);
-
-        }
-    }
-    ts->serializerSvc = serializerSvc;
-    //init new
-    if (ts->serializerSvc != NULL) {
-        hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap);
-        while (hashMapIterator_hasNext(&iter)) {
-            pubsub_subscriber_t* subsvc = hashMapIterator_nextKey(&iter);
-            bundle_pt bundle = hashMap_get(ts->bundleMap, subsvc);
-            pubsub_msg_serializer_map_t* map = NULL;
-            ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map);
-            hashMap_put(ts->msgSerializerMapMap, subsvc, map);
-        }
-    }
-	celixThreadMutex_unlock(&ts->ts_lock);
-
-	return status;
+array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt sub){
+	return sub->sub_ep_list;
 }
 
-celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* svc){
+static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service){
 	celix_status_t status = CELIX_SUCCESS;
+	topic_subscription_pt ts = handle;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-    if (ts->serializerSvc == svc) { //only act if svc removed is services used
-        hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap);
-        while (hashMapIterator_hasNext(&iter)) {
-            hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
-            pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry);
-            pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
-            ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
-            hashMap_put(ts->msgSerializerMapMap, subsvc, NULL);
-        }
-        ts->serializerSvc = NULL;
-    }
-	celixThreadMutex_unlock(&ts->ts_lock);
-
-	return status;
-}
+	if (!hashMap_containsKey(ts->servicesMap, service)) {
+		bundle_pt bundle = NULL;
+		hash_map_pt msgTypes = NULL;
 
-static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void* svc) {
-	celix_status_t status = CELIX_SUCCESS;
-	topic_subscription_pt ts = handle;
+		serviceReference_getBundle(reference, &bundle);
 
-	celixThreadMutex_lock(&ts->ts_lock);
-    if (!hashMap_containsKey(ts->msgSerializerMapMap, svc)) {
-        bundle_pt bundle = NULL;
-        serviceReference_getBundle(reference, &bundle);
-
-        if (ts->serializerSvc != NULL) {
-            pubsub_msg_serializer_map_t* map = NULL;
-            ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map);
-            if (map != NULL) {
-                hashMap_put(ts->msgSerializerMapMap, svc, map);
-                hashMap_put(ts->bundleMap, svc, bundle);
-            }
-        }
-    }
+		if(ts->serializer != NULL && bundle!=NULL){
+			ts->serializer->createSerializerMap(ts->serializer->handle,bundle,&msgTypes);
+			if(msgTypes != NULL){
+				hashMap_put(ts->servicesMap, service, msgTypes);
+				printf("PSA_ZMQ_TS: New subscriber registered.\n");
+			}
+		}
+		else{
+			printf("PSA_ZMQ_TS: Cannot register new subscriber.\n");
+			status = CELIX_SERVICE_EXCEPTION;
+		}
+	}
 	celixThreadMutex_unlock(&ts->ts_lock);
-	printf("TS: New subscriber registered.\n");
-	return status;
 
+	return status;
 }
 
-static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void* svc) {
+static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service){
 	celix_status_t status = CELIX_SUCCESS;
 	topic_subscription_pt ts = handle;
 
 	celixThreadMutex_lock(&ts->ts_lock);
-    if (hashMap_containsKey(ts->msgSerializerMapMap, svc)) {
-        pubsub_msg_serializer_map_t* map = hashMap_remove(ts->msgSerializerMapMap, svc);
-        if (ts->serializerSvc != NULL){
-            ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
-            hashMap_remove(ts->bundleMap, svc);
-            hashMap_remove(ts->msgSerializerMapMap, svc);
-        }
-    }
+	if (hashMap_containsKey(ts->servicesMap, service)) {
+		hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service);
+		if(msgTypes!=NULL && ts->serializer!=NULL){
+			ts->serializer->destroySerializerMap(ts->serializer->handle,msgTypes);
+			printf("PSA_ZMQ_TS: Subscriber unregistered.\n");
+		}
+		else{
+			printf("PSA_ZMQ_TS: Cannot unregister subscriber.\n");
+			status = CELIX_SERVICE_EXCEPTION;
+		}
+	}
 	celixThreadMutex_unlock(&ts->ts_lock);
 
-	printf("TS: Subscriber unregistered.\n");
 	return status;
 }
 
 
-static void process_msg(topic_subscription_pt sub, array_list_pt msg_list) {
+static void process_msg(topic_subscription_pt sub,array_list_pt msg_list){
 
 	pubsub_msg_header_pt first_msg_hdr = (pubsub_msg_header_pt)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->header);
 
-	hash_map_iterator_t iter = hashMapIterator_construct(sub->msgSerializerMapMap);
-	while (hashMapIterator_hasNext(&iter)) {
-		hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+	hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap);
+	while (hashMapIterator_hasNext(iter)) {
+		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
 		pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry);
-		pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
+		hash_map_pt msgTypes = hashMapEntry_getValue(entry);
 
-		pubsub_msg_serializer_t* msgSer = hashMap_get(map->serializers, (void*)(uintptr_t )first_msg_hdr->type);
+		pubsub_msg_serializer_t *msgSer = hashMap_get(msgTypes,(void*)(uintptr_t )first_msg_hdr->type);
 		if (msgSer == NULL) {
-			printf("TS: Primary message %d not supported. NOT sending any part of the whole message.\n", first_msg_hdr->type);
-		} else {
+			printf("PSA_ZMQ_TS: Primary message %d not supported. NOT sending any part of the whole message.\n",first_msg_hdr->type);
+		}
+		else{
 			void *msgInst = NULL;
-			bool validVersion = checkVersion(msgSer->msgVersion, first_msg_hdr);
+			bool validVersion = checkVersion(msgSer->msgVersion,first_msg_hdr);
+
 			if(validVersion){
-				celix_status_t status = msgSer->deserialize(msgSer->handle, (const char*)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), 0, &msgInst);
+
+				celix_status_t status = msgSer->deserialize(msgSer, (const void *) zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), 0, &msgInst);
 
 				if (status == CELIX_SUCCESS) {
 					bool release = true;
-
-					mp_handle_t* mp_handle = create_mp_handle(sub, map, msg_list);
+					mp_handle_pt mp_handle = create_mp_handle(msgTypes,msg_list);
 					pubsub_multipart_callbacks_t mp_callbacks;
 					mp_callbacks.handle = mp_handle;
 					mp_callbacks.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForMsgType;
 					mp_callbacks.getMultipart = pubsub_getMultipart;
 					subsvc->receive(subsvc->handle, msgSer->msgName, first_msg_hdr->type, msgInst, &mp_callbacks, &release);
 
-					if (release) {
-						msgSer->freeMsg(msgSer->handle, msgInst);
+					if(release){
+						msgSer->freeMsg(msgSer,msgInst); // pubsubSerializer_freeMsg(msgType, msgInst);
 					}
-					if (mp_handle!=NULL) {
+					if(mp_handle!=NULL){
 						destroy_mp_handle(mp_handle);
 					}
 				}
 				else{
-					printf("TS: Cannot deserialize msgType %s.\n", msgSer->msgName);
+					printf("PSA_ZMQ_TS: Cannot deserialize msgType %s.\n",msgSer->msgName);
 				}
 
-			} else {
+			}
+			else{
 				int major=0,minor=0;
-				version_getMajor(msgSer->msgVersion, &major);
-				version_getMinor(msgSer->msgVersion, &minor);
-				printf("TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",
-					   msgSer->msgName, major, minor, first_msg_hdr->major, first_msg_hdr->minor);
+				version_getMajor(msgSer->msgVersion,&major);
+				version_getMinor(msgSer->msgVersion,&minor);
+				printf("PSA_ZMQ_TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",
+						msgSer->msgName,major,minor,first_msg_hdr->major,first_msg_hdr->minor);
 			}
+
 		}
 	}
+	hashMapIterator_destroy(iter);
 
 	int i = 0;
 	for(;i<arrayList_size(msg_list);i++){
@@ -598,21 +501,21 @@ static void process_msg(topic_subscription_pt sub, array_list_pt msg_list) {
 }
 
 static void* zmq_recv_thread_func(void * arg) {
-    topic_subscription_pt sub = (topic_subscription_pt) arg;
+	topic_subscription_pt sub = (topic_subscription_pt) arg;
 
-    while (sub->running) {
+	while (sub->running) {
 
-        celixThreadMutex_lock(&sub->socket_lock);
+		celixThreadMutex_lock(&sub->socket_lock);
 
-        zframe_t* headerMsg = zframe_recv(sub->zmq_socket);
-        if (headerMsg == NULL) {
-            if (errno == EINTR) {
-                //It means we got a signal and we have to exit...
-                printf("TS: header_recv thread for topic got a signal and will exit.\n");
-            } else {
-                perror("TS: header_recv thread");
-            }
-        } else {
+		zframe_t* headerMsg = zframe_recv(sub->zmq_socket);
+		if (headerMsg == NULL) {
+			if (errno == EINTR) {
+				//It means we got a signal and we have to exit...
+				printf("PSA_ZMQ_TS: header_recv thread for topic got a signal and will exit.\n");
+			} else {
+				perror("PSA_ZMQ_TS: header_recv thread");
+			}
+		} else {
 
 			pubsub_msg_header_pt hdr = (pubsub_msg_header_pt) zframe_data(headerMsg);
 
@@ -622,9 +525,9 @@ static void* zmq_recv_thread_func(void * arg) {
 				if (payloadMsg == NULL) {
 					if (errno == EINTR) {
 						//It means we got a signal and we have to exit...
-						printf("TS: payload_recv thread for topic got a signal and will exit.\n");
+						printf("PSA_ZMQ_TS: payload_recv thread for topic got a signal and will exit.\n");
 					} else {
-						perror("TS: payload_recv");
+						perror("PSA_ZMQ_TS: payload_recv");
 					}
 					zframe_destroy(&headerMsg);
 				} else {
@@ -644,9 +547,9 @@ static void* zmq_recv_thread_func(void * arg) {
 						if (h_msg == NULL) {
 							if (errno == EINTR) {
 								//It means we got a signal and we have to exit...
-								printf("TS: h_recv thread for topic got a signal and will exit.\n");
+								printf("PSA_ZMQ_TS: h_recv thread for topic got a signal and will exit.\n");
 							} else {
-								perror("TS: h_recv");
+								perror("PSA_ZMQ_TS: h_recv");
 							}
 							break;
 						}
@@ -655,9 +558,9 @@ static void* zmq_recv_thread_func(void * arg) {
 						if (p_msg == NULL) {
 							if (errno == EINTR) {
 								//It means we got a signal and we have to exit...
-								printf("TS: p_recv thread for topic got a signal and will exit.\n");
+								printf("PSA_ZMQ_TS: p_recv thread for topic got a signal and will exit.\n");
 							} else {
-								perror("TS: p_recv");
+								perror("PSA_ZMQ_TS: p_recv");
 							}
 							zframe_destroy(&h_msg);
 							break;
@@ -682,16 +585,16 @@ static void* zmq_recv_thread_func(void * arg) {
 			} //zframe_more(headerMsg)
 			else {
 				free(headerMsg);
-				printf("TS: received message %u for topic %s without payload!\n", hdr->type, hdr->topic);
+				printf("PSA_ZMQ_TS: received message %u for topic %s without payload!\n", hdr->type, hdr->topic);
 			}
 
-        } // headerMsg != NULL
-        celixThreadMutex_unlock(&sub->socket_lock);
-        connectPendingPublishers(sub);
-        disconnectPendingPublishers(sub);
-    } // while
+		} // headerMsg != NULL
+		celixThreadMutex_unlock(&sub->socket_lock);
+		connectPendingPublishers(sub);
+		disconnectPendingPublishers(sub);
+	} // while
 
-    return NULL;
+	return NULL;
 }
 
 static void connectPendingPublishers(topic_subscription_pt sub) {
@@ -715,7 +618,7 @@ static void disconnectPendingPublishers(topic_subscription_pt sub) {
 }
 
 static void sigusr1_sighandler(int signo){
-	printf("TS: Topic subscription being shut down...\n");
+	printf("PSA_ZMQ_TS: Topic subscription being shut down...\n");
 	return;
 }
 
@@ -746,7 +649,7 @@ static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain
 		return -1;
 	}
 
-	mp_handle_t* mp_handle = handle;
+	mp_handle_pt mp_handle = (mp_handle_pt)handle;
 	msg_map_entry_pt entry = hashMap_get(mp_handle->rcv_msg_map,&msgTypeId);
 	if(entry!=NULL){
 		entry->retain = retain;
@@ -762,59 +665,66 @@ static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain
 
 }
 
-static mp_handle_t* create_mp_handle(topic_subscription_pt sub, pubsub_msg_serializer_map_t* map, array_list_pt rcv_msg_list) {
+static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt rcv_msg_list){
 
 	if(arrayList_size(rcv_msg_list)==1){ //Means it's not a multipart message
 		return NULL;
 	}
 
-	mp_handle_t* mp_handle = calloc(1,sizeof(struct mp_handle));
-	mp_handle->map = map;
+	mp_handle_pt mp_handle = calloc(1,sizeof(struct mp_handle));
+	mp_handle->svc_msg_db = svc_msg_db;
 	mp_handle->rcv_msg_map = hashMap_create(NULL, NULL, NULL, NULL);
 
-	int i; //We skip the first message, it will be handle differently
-	for (i=1 ; i<arrayList_size(rcv_msg_list) ; i++) {
-		complete_zmq_msg_pt c_msg = arrayList_get(rcv_msg_list,i);
+	int i=1; //We skip the first message, it will be handle differently
+	for(;i<arrayList_size(rcv_msg_list);i++){
+		complete_zmq_msg_pt c_msg = (complete_zmq_msg_pt)arrayList_get(rcv_msg_list,i);
 		pubsub_msg_header_pt header = (pubsub_msg_header_pt)zframe_data(c_msg->header);
 
-		pubsub_msg_serializer_t* msgSer = hashMap_get(map->serializers, (void*)(uintptr_t)(header->type));
-		if (msgSer != NULL) {
+		pubsub_msg_serializer_t* msgSer = hashMap_get(svc_msg_db, (void*)(uintptr_t)(header->type));
+
+		if (msgSer!= NULL) {
 			void *msgInst = NULL;
-			bool validVersion = checkVersion(msgSer->msgVersion, header);
+
+			bool validVersion = checkVersion(msgSer->msgVersion,header);
+
 			if(validVersion){
-				//TODO make the getMultipart lazy?
-				celix_status_t status = msgSer->deserialize(msgSer->handle, (const char*)zframe_data(c_msg->payload), 0, &msgInst);
+				celix_status_t status = msgSer->deserialize(msgSer->handle, (const void*)zframe_data(c_msg->payload), 0, &msgInst);
 
 				if(status == CELIX_SUCCESS){
 					msg_map_entry_pt entry = calloc(1,sizeof(struct msg_map_entry));
 					entry->msgInst = msgInst;
-					hashMap_put(mp_handle->rcv_msg_map, (void*)(uintptr_t)(header->type), entry);
+					hashMap_put(mp_handle->rcv_msg_map, (void*)(uintptr_t)header->type,entry);
 				}
 			}
 		}
 	}
+
 	return mp_handle;
+
 }
 
-static void destroy_mp_handle(mp_handle_t* mp_handle){
+static void destroy_mp_handle(mp_handle_pt mp_handle){
 
 	hash_map_iterator_pt iter = hashMapIterator_create(mp_handle->rcv_msg_map);
 	while(hashMapIterator_hasNext(iter)){
 		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
 		unsigned int msgId = (unsigned int)(uintptr_t)hashMapEntry_getKey(entry);
 		msg_map_entry_pt msgEntry = hashMapEntry_getValue(entry);
-		pubsub_msg_serializer_t* msgSer = hashMap_get(mp_handle->map->serializers, (void*)(uintptr_t)msgId);
-		if (msgSer != NULL) {
-			if (!msgEntry->retain) {
-				msgSer->freeMsg(msgSer->handle, msgEntry->msgInst);
+		pubsub_msg_serializer_t* msgSer = hashMap_get(mp_handle->svc_msg_db, (void*)(uintptr_t)msgId);
+
+		if(msgSer!=NULL){
+			if(!msgEntry->retain){
+				msgSer->freeMsg(msgSer->handle,msgEntry->msgInst);
 			}
 		}
 		else{
-			printf("TS: ERROR: Cannot find pubsub_message_type for msg %u, so cannot destroy it!\n", msgId);
+			printf("PSA_ZMQ_TS: ERROR: Cannot find messageSerializer for msg %u, so cannot destroy it!\n",msgId);
 		}
+
+		free(msgEntry);
 	}
 	hashMapIterator_destroy(iter);
 
-	hashMap_destroy(mp_handle->rcv_msg_map,true,true);
+	hashMap_destroy(mp_handle->rcv_msg_map,false,false);
 	free(mp_handle);
 }

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_common/public/include/pubsub_admin.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_admin.h b/pubsub/pubsub_common/public/include/pubsub_admin.h
index f7ab7e0..f24d825 100644
--- a/pubsub/pubsub_common/public/include/pubsub_admin.h
+++ b/pubsub/pubsub_common/public/include/pubsub_admin.h
@@ -31,13 +31,12 @@
 
 #include "pubsub_common.h"
 #include "pubsub_endpoint.h"
-#include "pubsub_serializer.h"
 
 #define PSA_IP 	"PSA_IP"
 #define PSA_ITF	"PSA_INTERFACE"
 #define PSA_MULTICAST_IP_PREFIX "PSA_MC_PREFIX"
 
-#define PSA_DEFAULT "zmq"
+#define PUBSUB_ADMIN_TYPE_KEY	"pubsub_admin.type"
 
 typedef struct pubsub_admin *pubsub_admin_pt;
 
@@ -53,12 +52,19 @@ struct pubsub_admin_service {
 	celix_status_t (*closeAllPublications)(pubsub_admin_pt admin,char* scope, char* topic);
 	celix_status_t (*closeAllSubscriptions)(pubsub_admin_pt admin,char* scope, char* topic);
 
-	celix_status_t (*matchPublisher)(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score);
-	celix_status_t (*matchSubscriber)(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score);
-
-	celix_status_t (*setSerializer)(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc);
-	celix_status_t (*removeSerializer)(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc);
-
+	/* Match principle:
+	 * - A full matching pubsub_admin gives 200 points
+	 * - A full matching serializer gives 100 points
+	 * - If QoS = sample
+	 * 		- fallback pubsub_admin order of selection is: udp_mc, zmq. Points allocation is 100,75.
+	 * 		- fallback serializers order of selection is: json, void. Points allocation is 30,20.
+	 * - If QoS = control
+	 * 		- fallback pubsub_admin order of selection is: zmq,udp_mc. Points allocation is 100,75.
+	 * 		- fallback serializers order of selection is: json, void. Points allocation is 30,20.
+	 * - If nothing is specified, QoS = sample is assumed, so the same score applies, just divided by two.
+	 *
+	 */
+	celix_status_t (*matchEndpoint)(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score);
 };
 
 typedef struct pubsub_admin_service *pubsub_admin_service_pt;

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_common/public/include/pubsub_admin_match.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_admin_match.h b/pubsub/pubsub_common/public/include/pubsub_admin_match.h
new file mode 100644
index 0000000..a366c34
--- /dev/null
+++ b/pubsub/pubsub_common/public/include/pubsub_admin_match.h
@@ -0,0 +1,27 @@
+/*
+ * pubsub_admin_match.h
+ *
+ *  Created on: Sep 4, 2017
+ *      Author: dn234
+ */
+
+#ifndef PUBSUB_ADMIN_MATCH_H_
+#define PUBSUB_ADMIN_MATCH_H_
+
+#include "celix_errno.h"
+#include "properties.h"
+#include "array_list.h"
+
+#include "pubsub_serializer.h"
+
+#define QOS_ATTRIBUTE_KEY	"attribute.qos"
+#define QOS_TYPE_SAMPLE		"sample"	/* A.k.a. unreliable connection */
+#define QOS_TYPE_CONTROL	"control"	/* A.k.a. reliable connection */
+
+#define PUBSUB_ADMIN_FULL_MATCH_SCORE	200.0F
+#define SERIALIZER_FULL_MATCH_SCORE		100.0F
+
+celix_status_t pubsub_admin_match(properties_pt endpoint_props, const char *pubsub_admin_type, array_list_pt serializerList, double *score);
+celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, array_list_pt serializerList, pubsub_serializer_service_t **serSvc);
+
+#endif /* PUBSUB_ADMIN_MATCH_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_common/public/include/pubsub_common.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_common.h b/pubsub/pubsub_common/public/include/pubsub_common.h
index 46abd72..5dfd8fd 100644
--- a/pubsub/pubsub_common/public/include/pubsub_common.h
+++ b/pubsub/pubsub_common/public/include/pubsub_common.h
@@ -32,12 +32,12 @@
 #define PUBSUB_DISCOVERY_SERVICE		"pubsub_discovery"
 #define PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE    "pubsub_tm_announce_publisher"
 
-#define PUBSUB_ANY_SUB_TOPIC			"any"
+#define PUBSUB_ANY_SUB_TOPIC		        "any"
 
-#define PUBSUB_BUNDLE_ID				"bundle.id"
+#define	PUBSUB_BUNDLE_ID			"bundle.id"
 
-#define MAX_SCOPE_LEN					1024
-#define MAX_TOPIC_LEN					1024
+#define MAX_SCOPE_LEN                           1024
+#define MAX_TOPIC_LEN				1024
 
 struct pubsub_msg_header{
 	char topic[MAX_TOPIC_LEN];

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_common/public/include/pubsub_endpoint.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_endpoint.h b/pubsub/pubsub_common/public/include/pubsub_endpoint.h
index 193b3fd..8a979eb 100644
--- a/pubsub/pubsub_common/public/include/pubsub_endpoint.h
+++ b/pubsub/pubsub_common/public/include/pubsub_endpoint.h
@@ -28,6 +28,11 @@
 #define PUBSUB_ENDPOINT_H_
 
 #include "service_reference.h"
+#include "listener_hook_service.h"
+#include "properties.h"
+
+#include "publisher.h"
+#include "subscriber.h"
 
 struct pubsub_endpoint {
     char *frameworkUUID;
@@ -36,12 +41,15 @@ struct pubsub_endpoint {
     long serviceID;
     char* endpoint;
     bool is_secure;
+    properties_pt topic_props;
 };
 
 typedef struct pubsub_endpoint *pubsub_endpoint_pt;
 
-celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,pubsub_endpoint_pt* psEp);
-celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference,pubsub_endpoint_pt* psEp);
+celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props,pubsub_endpoint_pt* psEp);
+celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference,pubsub_endpoint_pt* psEp, bool isPublisher);
+celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt info,pubsub_endpoint_pt* psEp, bool isPublisher);
+celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *out);
 celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp);
 bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2);
 

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_common/public/include/pubsub_serializer.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_serializer.h b/pubsub/pubsub_common/public/include/pubsub_serializer.h
index e9f9f6c..4489fa4 100644
--- a/pubsub/pubsub_common/public/include/pubsub_serializer.h
+++ b/pubsub/pubsub_common/public/include/pubsub_serializer.h
@@ -28,9 +28,12 @@
 #define PUBSUB_SERIALIZER_SERVICE_H_
 
 #include "service_reference.h"
+#include "hash_map.h"
 
 #include "pubsub_common.h"
 
+#define PUBSUB_SERIALIZER_TYPE_KEY	"pubsub_serializer.type"
+
 /**
  * There should be a pubsub_serializer_t
  * per msg type (msg id) per bundle
@@ -39,28 +42,24 @@
  * a serializer_map per bundle. Potentially using
  * the extender pattern.
  */
+
 typedef struct pubsub_msg_serializer {
-    void* handle;
-    unsigned int msgId;
-    const char* msgName;
-    version_pt msgVersion;
+	void* handle;
+	unsigned int msgId;
+	const char* msgName;
+	version_pt msgVersion;
 
-    celix_status_t (*serialize)(void* handle, const void* input, char** out, size_t* outLen);
-    celix_status_t (*deserialize)(void* handle, const char* input, size_t inputLen, void** out); //note inputLen can be 0 if predefined size is not needed
+	celix_status_t (*serialize)(void* handle, const void* input, void** out, size_t* outLen);
+	celix_status_t (*deserialize)(void* handle, const void* input, size_t inputLen, void** out); //note inputLen can be 0 if predefined size is not needed
+	void (*freeMsg)(void* handle, void* msg);
 
-    void (*freeMsg)(void* handle, void* msg);
 } pubsub_msg_serializer_t;
 
-typedef struct pubsub_msg_serializer_map {
-    bundle_pt bundle;
-    hash_map_pt serializers; //key = msg id (unsigned int), value = pubsub_serializer_t*
-} pubsub_msg_serializer_map_t;
-
 typedef struct pubsub_serializer_service {
 	void* handle;
 
-	celix_status_t (*createSerializerMap)(void* handle, bundle_pt bundle, pubsub_msg_serializer_map_t** out);
-    celix_status_t (*destroySerializerMap)(void* handle, pubsub_msg_serializer_map_t* map);
+	celix_status_t (*createSerializerMap)(void* handle, bundle_pt bundle, hash_map_pt* serializerMap);
+	celix_status_t (*destroySerializerMap)(void* handle, hash_map_pt serializerMap);
 
 } pubsub_serializer_service_t;
 

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_common/public/src/log_helper.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/src/log_helper.c b/pubsub/pubsub_common/public/src/log_helper.c
index dbd1cc3..7a63363 100644
--- a/pubsub/pubsub_common/public/src/log_helper.c
+++ b/pubsub/pubsub_common/public/src/log_helper.c
@@ -149,6 +149,9 @@ celix_status_t logHelper_destroy(log_helper_pt* loghelper) {
         return status;
 }
 
+
+
+
 celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* message, ... )
 {
     celix_status_t status = CELIX_SUCCESS;
@@ -166,6 +169,7 @@ celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* m
 		int i = 0;
 
 		for (; i < arrayList_size(loghelper->logServices); i++) {
+
 			log_service_pt logService = arrayList_get(loghelper->logServices, i);
 
 			if (logService != NULL) {
@@ -175,31 +179,31 @@ celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* m
 		}
 
 		pthread_mutex_unlock(&loghelper->logListLock);
+	}
 
-		if (!logged && loghelper->stdOutFallback) {
-			char *levelStr = NULL;
-
-			switch (level) {
-				case OSGI_LOGSERVICE_ERROR:
-					levelStr = "ERROR";
-					break;
-				case OSGI_LOGSERVICE_WARNING:
-					levelStr = "WARNING";
-					break;
-				case OSGI_LOGSERVICE_INFO:
-					levelStr = "INFO";
-					break;
-				case OSGI_LOGSERVICE_DEBUG:
-				default:
-					levelStr = "DEBUG";
-					break;
-			}
 
-			printf("%s: %s\n", levelStr, msg);
-		}
-	}
+    if (!logged && loghelper->stdOutFallback) {
+        char *levelStr = NULL;
+
+        switch (level) {
+            case OSGI_LOGSERVICE_ERROR:
+                levelStr = "ERROR";
+                break;
+            case OSGI_LOGSERVICE_WARNING:
+                levelStr = "WARNING";
+                break;
+            case OSGI_LOGSERVICE_INFO:
+                levelStr = "INFO";
+                break;
+            case OSGI_LOGSERVICE_DEBUG:
+            default:
+                levelStr = "DEBUG";
+                break;
+        }
+
+        printf("%s: %s\n", levelStr, msg);
+    }
 
-	va_end(listPointer);
 
 	return status;
 }

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_common/public/src/pubsub_admin_match.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/src/pubsub_admin_match.c b/pubsub/pubsub_common/public/src/pubsub_admin_match.c
new file mode 100644
index 0000000..bb555b7
--- /dev/null
+++ b/pubsub/pubsub_common/public/src/pubsub_admin_match.c
@@ -0,0 +1,303 @@
+/*
+ * pubsub_admin_match.c
+
+ *
+ *  Created on: Sep 4, 2017
+ *      Author: dn234
+ */
+
+#include <string.h>
+#include "service_reference.h"
+
+#include "pubsub_admin.h"
+
+#include "pubsub_admin_match.h"
+
+#define KNOWN_PUBSUB_ADMIN_NUM	2
+#define KNOWN_SERIALIZER_NUM	2
+
+static char* qos_sample_pubsub_admin_prio_list[KNOWN_PUBSUB_ADMIN_NUM] = {"udp_mc","zmq"};
+static char* qos_sample_serializer_prio_list[KNOWN_SERIALIZER_NUM] = {"json","void"};
+
+static char* qos_control_pubsub_admin_prio_list[KNOWN_PUBSUB_ADMIN_NUM] = {"zmq","udp_mc"};
+static char* qos_control_serializer_prio_list[KNOWN_SERIALIZER_NUM] = {"json","void"};
+
+static double qos_pubsub_admin_score[KNOWN_PUBSUB_ADMIN_NUM] = {100.0F,75.0F};
+static double qos_serializer_score[KNOWN_SERIALIZER_NUM] = {30.0F,20.0F};
+
+static void get_serializer_type(service_reference_pt svcRef, char **serializerType);
+static void manage_service_from_reference(service_reference_pt svcRef, void **svc, bool getService);
+
+celix_status_t pubsub_admin_match(properties_pt endpoint_props, const char *pubsub_admin_type, array_list_pt serializerList, double *score){
+
+	celix_status_t status = CELIX_SUCCESS;
+	double final_score = 0;
+	int i = 0, j = 0;
+
+	const char *requested_admin_type 		= NULL;
+	const char *requested_serializer_type 	= NULL;
+	const char *requested_qos_type			= NULL;
+
+	if(endpoint_props!=NULL){
+		requested_admin_type 		= properties_get(endpoint_props,PUBSUB_ADMIN_TYPE_KEY);
+		requested_serializer_type 	= properties_get(endpoint_props,PUBSUB_SERIALIZER_TYPE_KEY);
+		requested_qos_type			= properties_get(endpoint_props,QOS_ATTRIBUTE_KEY);
+	}
+
+	/* Analyze the pubsub_admin */
+	if(requested_admin_type != NULL){ /* We got precise specification on the pubsub_admin we want */
+		if(strncmp(requested_admin_type,pubsub_admin_type,strlen(pubsub_admin_type))==0){ //Full match
+			final_score += PUBSUB_ADMIN_FULL_MATCH_SCORE;
+		}
+	}
+	else if(requested_qos_type != NULL){ /* We got QoS specification that will determine the selected PSA */
+		if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){
+			for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){
+				if(strncmp(qos_sample_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){
+					final_score += qos_pubsub_admin_score[i];
+					break;
+				}
+			}
+		}
+		else if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){
+			for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){
+				if(strncmp(qos_control_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){
+					final_score += qos_pubsub_admin_score[i];
+					break;
+				}
+			}
+		}
+		else{
+			printf("Unknown QoS type '%s'\n",requested_qos_type);
+			status = CELIX_ILLEGAL_ARGUMENT;
+		}
+	}
+	else{ /* We got no specification: fallback to Qos=Sample, but count half the score */
+		for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){
+			if(strncmp(qos_sample_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){
+				final_score += (qos_pubsub_admin_score[i]/2);
+				break;
+			}
+		}
+	}
+
+	char *serializer_type = NULL;
+	/* Analyze the serializers */
+	if(requested_serializer_type != NULL){ /* We got precise specification on the serializer we want */
+		for(i=0;i<arrayList_size(serializerList);i++){
+			service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,i);
+			get_serializer_type(svcRef, &serializer_type);
+			if(serializer_type != NULL){
+				if(strncmp(requested_serializer_type,serializer_type,strlen(serializer_type))==0){
+					final_score += SERIALIZER_FULL_MATCH_SCORE;
+					break;
+				}
+			}
+		}
+	}
+	else if(requested_qos_type != NULL){ /* We got QoS specification that will determine the selected serializer */
+		if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){
+			bool ser_found = false;
+			for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
+				for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){
+					service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,j);
+					get_serializer_type(svcRef, &serializer_type);
+					if(serializer_type != NULL){
+						if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
+							ser_found = true;
+						}
+					}
+				}
+				if(ser_found){
+					final_score += qos_serializer_score[i];
+				}
+			}
+		}
+		else if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){
+			bool ser_found = false;
+			for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
+				for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){
+					service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,j);
+					get_serializer_type(svcRef, &serializer_type);
+					if(serializer_type != NULL){
+						if(strncmp(qos_control_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
+							ser_found = true;
+						}
+					}
+				}
+				if(ser_found){
+					final_score += qos_serializer_score[i];
+				}
+			}
+		}
+		else{
+			printf("Unknown QoS type '%s'\n",requested_qos_type);
+			status = CELIX_ILLEGAL_ARGUMENT;
+		}
+	}
+	else{ /* We got no specification: fallback to Qos=Sample, but count half the score */
+		bool ser_found = false;
+		for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
+			for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){
+				service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,j);
+				get_serializer_type(svcRef, &serializer_type);
+				if(serializer_type != NULL){
+					if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
+						ser_found = true;
+					}
+				}
+			}
+			if(ser_found){
+				final_score += (qos_serializer_score[i]/2);
+			}
+		}
+	}
+
+	*score = final_score;
+
+	printf("Score for pair <%s,%s> = %f\n",pubsub_admin_type,serializer_type,final_score);
+
+	return status;
+}
+
+celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, array_list_pt serializerList, pubsub_serializer_service_t **serSvc){
+	celix_status_t status = CELIX_SUCCESS;
+
+	int i = 0, j = 0;
+
+	const char *requested_serializer_type 	= properties_get(endpoint_props,PUBSUB_SERIALIZER_TYPE_KEY);
+	const char *requested_qos_type			= properties_get(endpoint_props,QOS_ATTRIBUTE_KEY);
+
+	service_reference_pt svcRef = NULL;
+	void *svc = NULL;
+
+	/* Analyze the serializers */
+	if(requested_serializer_type != NULL){ /* We got precise specification on the serializer we want */
+		for(i=0;i<arrayList_size(serializerList);i++){
+			svcRef = (service_reference_pt)arrayList_get(serializerList,i);
+			char *serializer_type = NULL;
+			get_serializer_type(svcRef, &serializer_type);
+			if(serializer_type != NULL){
+				if(strncmp(requested_serializer_type,serializer_type,strlen(serializer_type))==0){
+					manage_service_from_reference(svcRef, &svc,true);
+					if(svc==NULL){
+						printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef);
+						status = CELIX_SERVICE_EXCEPTION;
+					}
+					*serSvc = svc;
+					break;
+				}
+			}
+		}
+	}
+	else if(requested_qos_type != NULL){ /* We got QoS specification that will determine the selected serializer */
+		if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){
+			bool ser_found = false;
+			for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
+				for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){
+					svcRef = (service_reference_pt)arrayList_get(serializerList,j);
+					char *serializer_type = NULL;
+					get_serializer_type(svcRef, &serializer_type);
+					if(serializer_type != NULL){
+						if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
+							manage_service_from_reference(svcRef, &svc,true);
+							if(svc==NULL){
+								printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef);
+								status = CELIX_SERVICE_EXCEPTION;
+							}
+							else{
+								*serSvc = svc;
+								ser_found = true;
+								printf("Selected %s serializer as best for QoS=%s\n",qos_sample_serializer_prio_list[i],QOS_TYPE_SAMPLE);
+							}
+						}
+					}
+				}
+			}
+		}
+		else if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){
+			bool ser_found = false;
+			for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
+				for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){
+					svcRef = (service_reference_pt)arrayList_get(serializerList,j);
+					char *serializer_type = NULL;
+					get_serializer_type(svcRef, &serializer_type);
+					if(serializer_type != NULL){
+						if(strncmp(qos_control_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
+							manage_service_from_reference(svcRef, &svc,true);
+							if(svc==NULL){
+								printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef);
+								status = CELIX_SERVICE_EXCEPTION;
+							}
+							else{
+								*serSvc = svc;
+								ser_found = true;
+								printf("Selected %s serializer as best for QoS=%s\n",qos_control_serializer_prio_list[i],QOS_TYPE_CONTROL);
+							}
+						}
+					}
+				}
+			}
+		}
+		else{
+			printf("Unknown QoS type '%s'\n",requested_qos_type);
+			status = CELIX_ILLEGAL_ARGUMENT;
+		}
+	}
+	else{ /* We got no specification: fallback to Qos=Sample, but count half the score */
+		bool ser_found = false;
+		for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
+			for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){
+				svcRef = (service_reference_pt)arrayList_get(serializerList,j);
+				char *serializer_type = NULL;
+				get_serializer_type(svcRef, &serializer_type);
+				if(serializer_type != NULL){
+					if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
+						manage_service_from_reference(svcRef, &svc,true);
+						if(svc==NULL){
+							printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef);
+							status = CELIX_SERVICE_EXCEPTION;
+						}
+						else{
+							*serSvc = svc;
+							ser_found = true;
+							printf("Selected %s serializer as best without any specification\n",qos_sample_serializer_prio_list[i]);
+						}
+					}
+				}
+			}
+		}
+	}
+
+	if(svc!=NULL && svcRef!=NULL){
+		manage_service_from_reference(svcRef, svc, false);
+	}
+
+	return status;
+}
+
+static void get_serializer_type(service_reference_pt svcRef, char **serializerType){
+
+	const char *serType = NULL;
+	serviceReference_getProperty(svcRef, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
+	if(serType != NULL){
+		*serializerType = (char*)serType;
+	}
+	else{
+		printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",svcRef);
+		*serializerType = NULL;
+	}
+}
+
+static void manage_service_from_reference(service_reference_pt svcRef, void **svc, bool getService){
+	bundle_context_pt context = NULL;
+	bundle_pt bundle = NULL;
+	serviceReference_getBundle(svcRef, &bundle);
+	bundle_getContext(bundle, &context);
+	if(getService){
+		bundleContext_getService(context, svcRef, svc);
+	}
+	else{
+		bundleContext_ungetService(context, svcRef, NULL);
+	}
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_common/public/src/pubsub_endpoint.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/src/pubsub_endpoint.c b/pubsub/pubsub_common/public/src/pubsub_endpoint.c
index ebb330e..f6776d5 100644
--- a/pubsub/pubsub_common/public/src/pubsub_endpoint.c
+++ b/pubsub/pubsub_common/public/src/pubsub_endpoint.c
@@ -33,41 +33,101 @@
 #include "pubsub_common.h"
 #include "pubsub_endpoint.h"
 #include "constants.h"
-#include "subscriber.h"
 
-celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId, const char* endpoint, pubsub_endpoint_pt* out) {
-    celix_status_t status = CELIX_SUCCESS;
+#include "pubsub_utils.h"
 
-    pubsub_endpoint_pt psEp = calloc(1, sizeof(*psEp));
 
-    if (fwUUID != NULL) {
-        psEp->frameworkUUID = strdup(fwUUID);
-    }
+static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props, bool cloneProps);
+static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const char *topic, bool isPublisher);
 
-    if (scope != NULL) {
-        psEp->scope = strdup(scope);
-    }
+static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props, bool cloneProps){
 
-    if (topic != NULL) {
-        psEp->topic = strdup(topic);
-    }
+	if (fwUUID != NULL) {
+		psEp->frameworkUUID = strdup(fwUUID);
+	}
+
+	if (scope != NULL) {
+		psEp->scope = strdup(scope);
+	}
+
+	if (topic != NULL) {
+		psEp->topic = strdup(topic);
+	}
+
+	psEp->serviceID = serviceId;
+
+	if(endpoint != NULL) {
+		psEp->endpoint = strdup(endpoint);
+	}
+
+	if(topic_props != NULL){
+		if(cloneProps){
+			properties_copy(topic_props, &(psEp->topic_props));
+		}
+		else{
+			psEp->topic_props = topic_props;
+		}
+	}
+}
+
+static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const char *topic, bool isPublisher){
+
+	properties_pt topic_props = NULL;
+
+	bool isSystemBundle = false;
+	bundle_isSystemBundle(bundle, &isSystemBundle);
+	long bundleId = -1;
+	bundle_isSystemBundle(bundle, &isSystemBundle);
+	bundle_getBundleId(bundle,&bundleId);
+
+	if(isSystemBundle == false) {
 
-    psEp->serviceID = serviceId;
+		char *bundleRoot = NULL;
+		char* topicPropertiesPath = NULL;
+		bundle_getEntry(bundle, ".", &bundleRoot);
 
-    if (endpoint != NULL) {
-        psEp->endpoint = strdup(endpoint);
-    }
+		if(bundleRoot != NULL){
 
-	*out = psEp;
+			asprintf(&topicPropertiesPath, "%s/META-INF/topics/%s/%s.properties", bundleRoot, isPublisher?"pub":"sub", topic);
+			topic_props = properties_load(topicPropertiesPath);
+			if(topic_props==NULL){
+				printf("PSEP: Could not load properties for %s on topic %s, bundleId=%ld\n", isPublisher?"publication":"subscription", topic,bundleId);
+			}
 
-    return status;
+			free(topicPropertiesPath);
+			free(bundleRoot);
+		}
+	}
+
+	return topic_props;
+}
+
+celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props,pubsub_endpoint_pt* psEp){
+	celix_status_t status = CELIX_SUCCESS;
+
+	*psEp = calloc(1, sizeof(**psEp));
+
+	pubsubEndpoint_setFields(*psEp, fwUUID, scope, topic, serviceId, endpoint, topic_props, true);
+
+	return status;
 
 }
 
-celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference, pubsub_endpoint_pt* out){
+celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *out){
 	celix_status_t status = CELIX_SUCCESS;
 
-	pubsub_endpoint_pt psEp = calloc(1,sizeof(*psEp));
+	*out = calloc(1,sizeof(**out));
+
+	pubsubEndpoint_setFields(*out, in->frameworkUUID, in->scope, in->topic, in->serviceID, in->endpoint, in->topic_props, true);
+
+	return status;
+
+}
+
+celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference, pubsub_endpoint_pt* psEp, bool isPublisher){
+	celix_status_t status = CELIX_SUCCESS;
+
+	*psEp = calloc(1,sizeof(**psEp));
 
 	bundle_pt bundle = NULL;
 	bundle_context_pt ctxt = NULL;
@@ -85,49 +145,86 @@ celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt re
 	const char* serviceId = NULL;
 	serviceReference_getProperty(reference,(char*)OSGI_FRAMEWORK_SERVICE_ID,&serviceId);
 
-	if(fwUUID!=NULL){
-		psEp->frameworkUUID = strdup(fwUUID);
-	}
+	/* TODO: is topic_props==NULL a fatal error such that EP cannot be created? */
+	properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, topic, isPublisher);
 
-	if(scope!=NULL){
-		psEp->scope = strdup(scope);
-	} else {
-	    psEp->scope = strdup(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT);
-	}
+	pubsubEndpoint_setFields(*psEp, fwUUID, scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, strtol(serviceId,NULL,10), NULL, topic_props, false);
 
-	if(topic!=NULL){
-		psEp->topic = strdup(topic);
+	if (!(*psEp)->frameworkUUID || !(*psEp)->serviceID || !(*psEp)->scope || !(*psEp)->topic) {
+		fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "PUBSUB_ENDPOINT: incomplete description!.");
+		status = CELIX_BUNDLE_EXCEPTION;
 	}
 
-	if(serviceId!=NULL){
-		psEp->serviceID = strtol(serviceId,NULL,10);
-	}
+	return status;
 
-	if (!psEp->frameworkUUID || !psEp->serviceID || !psEp->scope || !psEp->topic) {
-		fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "PUBSUB_ENDPOINT: incomplete description!.");
-		status = CELIX_BUNDLE_EXCEPTION;
+}
+
+celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt info,pubsub_endpoint_pt* psEp, bool isPublisher){
+	celix_status_t status = CELIX_SUCCESS;
+
+	const char* fwUUID=NULL;
+	bundleContext_getProperty(info->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+
+	char* topic = pubsub_getTopicFromFilter(info->filter);
+	if(topic==NULL || fwUUID==NULL){
+		return CELIX_BUNDLE_EXCEPTION;
 	}
 
-	if (status != CELIX_SUCCESS) {
-		pubsubEndpoint_destroy(psEp);
-	} else {
-		*out = psEp;
+	*psEp = calloc(1, sizeof(**psEp));
+
+	char* scope = pubsub_getScopeFromFilter(info->filter);
+	if(scope == NULL) {
+		scope = strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT);
 	}
 
-	return status;
+	bundle_pt bundle = NULL;
+	long bundleId = -1;
+	bundleContext_getBundle(info->context,&bundle);
 
+	bundle_getBundleId(bundle,&bundleId);
+
+	properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, topic, isPublisher);
+
+	/* TODO: is topic_props==NULL a fatal error such that EP cannot be created? */
+	pubsubEndpoint_setFields(*psEp, fwUUID, scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, bundleId, NULL, topic_props, false);
+
+	free(topic);
+	free(scope);
+
+
+	return status;
 }
 
 celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){
-	if (psEp != NULL) {
+
+	if(psEp->frameworkUUID!=NULL){
 		free(psEp->frameworkUUID);
+		psEp->frameworkUUID = NULL;
+	}
+
+	if(psEp->scope!=NULL){
 		free(psEp->scope);
+		psEp->scope = NULL;
+	}
+
+	if(psEp->topic!=NULL){
 		free(psEp->topic);
+		psEp->topic = NULL;
+	}
+
+	if(psEp->endpoint!=NULL){
 		free(psEp->endpoint);
+		psEp->endpoint = NULL;
 	}
+
+	if(psEp->topic_props != NULL){
+		properties_destroy(psEp->topic_props);
+	}
+
 	free(psEp);
 
 	return CELIX_SUCCESS;
+
 }
 
 bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){
@@ -138,7 +235,6 @@ bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){
 			(psEp1->serviceID == psEp2->serviceID) /*&&
 			((psEp1->endpoint==NULL && psEp2->endpoint==NULL)||(strcmp(psEp1->endpoint,psEp2->endpoint)==0))*/
 	);
-
 }
 
 char *createScopeTopicKey(const char* scope, const char* topic) {

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h b/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h
index d5be8d6..676a6ab 100644
--- a/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h
+++ b/pubsub/pubsub_discovery/private/include/pubsub_discovery_impl.h
@@ -58,7 +58,6 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_pt node_discovery);
 celix_status_t pubsub_discovery_addNode(pubsub_discovery_pt node_discovery, pubsub_endpoint_pt pubEP);
 celix_status_t pubsub_discovery_removeNode(pubsub_discovery_pt node_discovery, pubsub_endpoint_pt pubEP);
 
-celix_status_t pubsub_discovery_tmPublisherAnnounceAdding(void * handle, service_reference_pt reference, void **service);
 celix_status_t pubsub_discovery_tmPublisherAnnounceAdded(void * handle, service_reference_pt reference, void * service);
 celix_status_t pubsub_discovery_tmPublisherAnnounceModified(void * handle, service_reference_pt reference, void * service);
 celix_status_t pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, service_reference_pt reference, void * service);

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_discovery/private/src/etcd_common.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/src/etcd_common.c b/pubsub/pubsub_discovery/private/src/etcd_common.c
index a53a844..c757801 100644
--- a/pubsub/pubsub_discovery/private/src/etcd_common.c
+++ b/pubsub/pubsub_discovery/private/src/etcd_common.c
@@ -32,6 +32,7 @@
 #include "pubsub_discovery.h"
 #include "pubsub_discovery_impl.h"
 
+
 #define MAX_ROOTNODE_LENGTH		128
 #define MAX_LOCALNODE_LENGTH 	4096
 #define MAX_FIELD_LENGTH		128

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_discovery/private/src/etcd_watcher.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/src/etcd_watcher.c b/pubsub/pubsub_discovery/private/src/etcd_watcher.c
index 13ba3aa..3c3a5a8 100644
--- a/pubsub/pubsub_discovery/private/src/etcd_watcher.c
+++ b/pubsub/pubsub_discovery/private/src/etcd_watcher.c
@@ -57,17 +57,17 @@ struct etcd_watcher {
 	celix_thread_mutex_t watcherLock;
 	celix_thread_t watcherThread;
 
-    char *scope;
+	char *scope;
 	char *topic;
 	volatile bool running;
 };
 
 struct etcd_writer {
-    pubsub_discovery_pt pubsub_discovery;
-    celix_thread_mutex_t localPubsLock;
-    array_list_pt localPubs;
-    volatile bool running;
-    celix_thread_t writerThread;
+	pubsub_discovery_pt pubsub_discovery;
+	celix_thread_mutex_t localPubsLock;
+	array_list_pt localPubs;
+	volatile bool running;
+	celix_thread_t writerThread;
 };
 
 
@@ -77,41 +77,41 @@ static celix_status_t etcdWatcher_getTopicRootPath(bundle_context_pt context, co
 	const char* rootPath = NULL;
 
 	if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) {
-	    snprintf(rootNode, rootNodeLen, "%s/%s/%s", DEFAULT_ETCD_ROOTPATH, scope, topic);
+		snprintf(rootNode, rootNodeLen, "%s/%s/%s", DEFAULT_ETCD_ROOTPATH, scope, topic);
 	} else {
-        snprintf(rootNode, rootNodeLen, "%s/%s/%s", rootPath, scope, topic);
+		snprintf(rootNode, rootNodeLen, "%s/%s/%s", rootPath, scope, topic);
 	}
 
 	return status;
 }
 
 static celix_status_t etcdWatcher_getRootPath(bundle_context_pt context, char* rootNode) {
-    celix_status_t status = CELIX_SUCCESS;
-    const char* rootPath = NULL;
+	celix_status_t status = CELIX_SUCCESS;
+	const char* rootPath = NULL;
 
-    if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) {
-        strncpy(rootNode, DEFAULT_ETCD_ROOTPATH, MAX_ROOTNODE_LENGTH);
-    } else {
-        strncpy(rootNode, rootPath, MAX_ROOTNODE_LENGTH);
-    }
+	if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) {
+		strncpy(rootNode, DEFAULT_ETCD_ROOTPATH, MAX_ROOTNODE_LENGTH);
+	} else {
+		strncpy(rootNode, rootPath, MAX_ROOTNODE_LENGTH);
+	}
 
-    return status;
+	return status;
 }
 
 
 static void add_node(const char *key, const char *value, void* arg) {
-    pubsub_discovery_pt ps_discovery = (pubsub_discovery_pt) arg;
-    pubsub_endpoint_pt pubEP = NULL;
-    celix_status_t status = etcdWatcher_getPublisherEndpointFromKey(ps_discovery, key, value, &pubEP);
-    if(!status && pubEP) {
-        pubsub_discovery_addNode(ps_discovery, pubEP);
-    }
+	pubsub_discovery_pt ps_discovery = (pubsub_discovery_pt) arg;
+	pubsub_endpoint_pt pubEP = NULL;
+	celix_status_t status = etcdWatcher_getPublisherEndpointFromKey(ps_discovery, key, value, &pubEP);
+	if(!status && pubEP) {
+		pubsub_discovery_addNode(ps_discovery, pubEP);
+	}
 }
 
 static celix_status_t etcdWatcher_addAlreadyExistingPublishers(pubsub_discovery_pt ps_discovery, const char *rootPath, long long * highestModified) {
 	celix_status_t status = CELIX_SUCCESS;
 	if(etcd_get_directory(rootPath, add_node, ps_discovery, highestModified)) {
-	    status = CELIX_ILLEGAL_ARGUMENT;
+		status = CELIX_ILLEGAL_ARGUMENT;
 	}
 	return status;
 }
@@ -137,14 +137,14 @@ celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt pubsu
 
 	asprintf(&expr, "/%s/%%[^/]/%%[^/]/%%[^/]/%%[^/].*", rootPath);
 	if(expr) {
-            int foundItems = sscanf(etcdKey, expr, scope, topic, fwUUID, serviceId);
-            free(expr);
-            if (foundItems != 4) { // Could happen when a directory is removed, just don't process this.
-                    status = CELIX_ILLEGAL_STATE;
-            }
-            else{
-                    status = pubsubEndpoint_create(fwUUID,scope,topic,strtol(serviceId,NULL,10),etcdValue,pubEP);
-            }
+		int foundItems = sscanf(etcdKey, expr, scope, topic, fwUUID, serviceId);
+		free(expr);
+		if (foundItems != 4) { // Could happen when a directory is removed, just don't process this.
+			status = CELIX_ILLEGAL_STATE;
+		}
+		else{
+			status = pubsubEndpoint_create(fwUUID,scope,topic,strtol(serviceId,NULL,10),etcdValue,NULL,pubEP);
+		}
 	}
 	return status;
 }
@@ -154,75 +154,75 @@ celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt pubsu
  * changing discovery endpoint information within etcd.
  */
 static void* etcdWatcher_run(void* data) {
-    etcd_watcher_pt watcher = (etcd_watcher_pt) data;
-    time_t timeBeforeWatch = time(NULL);
-    char rootPath[MAX_ROOTNODE_LENGTH];
-    long long highestModified = 0;
-
-    pubsub_discovery_pt ps_discovery = watcher->pubsub_discovery;
-    bundle_context_pt context = ps_discovery->context;
-
-    memset(rootPath, 0, MAX_ROOTNODE_LENGTH);
-
-    //TODO: add topic to etcd key
-    etcdWatcher_getTopicRootPath(context, watcher->scope, watcher->topic, rootPath, MAX_ROOTNODE_LENGTH);
-    etcdWatcher_addAlreadyExistingPublishers(ps_discovery, rootPath, &highestModified);
-
-    while ((celixThreadMutex_lock(&watcher->watcherLock) == CELIX_SUCCESS) && watcher->running) {
-
-        char *rkey = NULL;
-        char *value = NULL;
-        char *preValue = NULL;
-        char *action = NULL;
-        long long modIndex;
-
-        celixThreadMutex_unlock(&watcher->watcherLock);
-
-        if (etcd_watch(rootPath, highestModified + 1, &action, &preValue, &value, &rkey, &modIndex) == 0 && action != NULL) {
-            pubsub_endpoint_pt pubEP = NULL;
-            if ((strcmp(action, "set") == 0) || (strcmp(action, "create") == 0)) {
-                if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == CELIX_SUCCESS) {
-                    pubsub_discovery_addNode(ps_discovery, pubEP);
-                }
-            } else if (strcmp(action, "delete") == 0) {
-                if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) == CELIX_SUCCESS) {
-                    pubsub_discovery_removeNode(ps_discovery, pubEP);
-                }
-            } else if (strcmp(action, "expire") == 0) {
-                if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) == CELIX_SUCCESS) {
-                    pubsub_discovery_removeNode(ps_discovery, pubEP);
-                }
-            } else if (strcmp(action, "update") == 0) {
-                if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == CELIX_SUCCESS) {
-                    pubsub_discovery_addNode(ps_discovery, pubEP);
-                }
-            } else {
-                fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Unexpected action: %s", action);
-            }
-            highestModified = modIndex;
-        } else if (time(NULL) - timeBeforeWatch <= (DEFAULT_ETCD_TTL / 4)) {
-            sleep(DEFAULT_ETCD_TTL / 4);
-        }
-
-        FREE_MEM(action);
-        FREE_MEM(value);
-        FREE_MEM(preValue);
-        FREE_MEM(rkey);
-
-        /* prevent busy waiting, in case etcd_watch returns false */
-
-
-        if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL / 4)) {
-            timeBeforeWatch = time(NULL);
-        }
-
-    }
-
-    if (watcher->running == false) {
-        celixThreadMutex_unlock(&watcher->watcherLock);
-    }
-
-    return NULL;
+	etcd_watcher_pt watcher = (etcd_watcher_pt) data;
+	time_t timeBeforeWatch = time(NULL);
+	char rootPath[MAX_ROOTNODE_LENGTH];
+	long long highestModified = 0;
+
+	pubsub_discovery_pt ps_discovery = watcher->pubsub_discovery;
+	bundle_context_pt context = ps_discovery->context;
+
+	memset(rootPath, 0, MAX_ROOTNODE_LENGTH);
+
+	//TODO: add topic to etcd key
+	etcdWatcher_getTopicRootPath(context, watcher->scope, watcher->topic, rootPath, MAX_ROOTNODE_LENGTH);
+	etcdWatcher_addAlreadyExistingPublishers(ps_discovery, rootPath, &highestModified);
+
+	while ((celixThreadMutex_lock(&watcher->watcherLock) == CELIX_SUCCESS) && watcher->running) {
+
+		char *rkey = NULL;
+		char *value = NULL;
+		char *preValue = NULL;
+		char *action = NULL;
+		long long modIndex;
+
+		celixThreadMutex_unlock(&watcher->watcherLock);
+
+		if (etcd_watch(rootPath, highestModified + 1, &action, &preValue, &value, &rkey, &modIndex) == 0 && action != NULL) {
+			pubsub_endpoint_pt pubEP = NULL;
+			if ((strcmp(action, "set") == 0) || (strcmp(action, "create") == 0)) {
+				if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == CELIX_SUCCESS) {
+					pubsub_discovery_addNode(ps_discovery, pubEP);
+				}
+			} else if (strcmp(action, "delete") == 0) {
+				if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) == CELIX_SUCCESS) {
+					pubsub_discovery_removeNode(ps_discovery, pubEP);
+				}
+			} else if (strcmp(action, "expire") == 0) {
+				if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) == CELIX_SUCCESS) {
+					pubsub_discovery_removeNode(ps_discovery, pubEP);
+				}
+			} else if (strcmp(action, "update") == 0) {
+				if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == CELIX_SUCCESS) {
+					pubsub_discovery_addNode(ps_discovery, pubEP);
+				}
+			} else {
+				fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Unexpected action: %s", action);
+			}
+			highestModified = modIndex;
+		} else if (time(NULL) - timeBeforeWatch <= (DEFAULT_ETCD_TTL / 4)) {
+			sleep(DEFAULT_ETCD_TTL / 4);
+		}
+
+		FREE_MEM(action);
+		FREE_MEM(value);
+		FREE_MEM(preValue);
+		FREE_MEM(rkey);
+
+		/* prevent busy waiting, in case etcd_watch returns false */
+
+
+		if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL / 4)) {
+			timeBeforeWatch = time(NULL);
+		}
+
+	}
+
+	if (watcher->running == false) {
+		celixThreadMutex_unlock(&watcher->watcherLock);
+	}
+
+	return NULL;
 }
 
 celix_status_t etcdWatcher_create(pubsub_discovery_pt pubsub_discovery, bundle_context_pt context, const char *scope, const char *topic, etcd_watcher_pt *watcher) {
@@ -243,16 +243,18 @@ celix_status_t etcdWatcher_create(pubsub_discovery_pt pubsub_discovery, bundle_c
 	(*watcher)->scope = strdup(scope);
 	(*watcher)->topic = strdup(topic);
 
-    celixThreadMutex_create(&(*watcher)->watcherLock, NULL);
 
-    celixThreadMutex_lock(&(*watcher)->watcherLock);
+	celixThreadMutex_create(&(*watcher)->watcherLock, NULL);
+
+	celixThreadMutex_lock(&(*watcher)->watcherLock);
+
+	status = celixThread_create(&(*watcher)->watcherThread, NULL, etcdWatcher_run, *watcher);
+	if (status == CELIX_SUCCESS) {
+		(*watcher)->running = true;
+	}
 
-    status = celixThread_create(&(*watcher)->watcherThread, NULL, etcdWatcher_run, *watcher);
-    if (status == CELIX_SUCCESS) {
-    	(*watcher)->running = true;
-    }
+	celixThreadMutex_unlock(&(*watcher)->watcherLock);
 
-    celixThreadMutex_unlock(&(*watcher)->watcherLock);
 
 	return status;
 }

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_discovery/private/src/etcd_writer.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/src/etcd_writer.c b/pubsub/pubsub_discovery/private/src/etcd_writer.c
index 687d802..1c423f3 100644
--- a/pubsub/pubsub_discovery/private/src/etcd_writer.c
+++ b/pubsub/pubsub_discovery/private/src/etcd_writer.c
@@ -47,11 +47,11 @@
 #define DEFAULT_ETCD_TTL 30
 
 struct etcd_writer {
-    pubsub_discovery_pt pubsub_discovery;
-    celix_thread_mutex_t localPubsLock;
-    array_list_pt localPubs;
-    volatile bool running;
-    celix_thread_t writerThread;
+	pubsub_discovery_pt pubsub_discovery;
+	celix_thread_mutex_t localPubsLock;
+	array_list_pt localPubs;
+	volatile bool running;
+	celix_thread_t writerThread;
 };
 
 
@@ -60,38 +60,38 @@ static void* etcdWriter_run(void* data);
 
 
 etcd_writer_pt etcdWriter_create(pubsub_discovery_pt disc) {
-    etcd_writer_pt writer = calloc(1, sizeof(*writer));
-    if(writer) {
-        celixThreadMutex_create(&writer->localPubsLock, NULL);
-        arrayList_create(&writer->localPubs);
-        writer->pubsub_discovery = disc;
-        writer->running = true;
-        celixThread_create(&writer->writerThread, NULL, etcdWriter_run, writer);
-    }
-    return writer;
+	etcd_writer_pt writer = calloc(1, sizeof(*writer));
+	if(writer) {
+		celixThreadMutex_create(&writer->localPubsLock, NULL);
+		arrayList_create(&writer->localPubs);
+		writer->pubsub_discovery = disc;
+		writer->running = true;
+		celixThread_create(&writer->writerThread, NULL, etcdWriter_run, writer);
+	}
+	return writer;
 }
 
 void etcdWriter_destroy(etcd_writer_pt writer) {
-    char dir[MAX_ROOTNODE_LENGTH];
-    const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context);
-
-    writer->running = false;
-    celixThread_join(writer->writerThread, NULL);
-
-    celixThreadMutex_lock(&writer->localPubsLock);
-    for(int i = 0; i < arrayList_size(writer->localPubs); i++) {
-        pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(writer->localPubs,i);
-        memset(dir,0,MAX_ROOTNODE_LENGTH);
-        snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID);
-        etcd_del(dir);
-        pubsubEndpoint_destroy(pubEP);
-    }
-    arrayList_destroy(writer->localPubs);
-
-    celixThreadMutex_unlock(&writer->localPubsLock);
-    celixThreadMutex_destroy(&(writer->localPubsLock));
-
-    free(writer);
+	char dir[MAX_ROOTNODE_LENGTH];
+	const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context);
+
+	writer->running = false;
+	celixThread_join(writer->writerThread, NULL);
+
+	celixThreadMutex_lock(&writer->localPubsLock);
+	for(int i = 0; i < arrayList_size(writer->localPubs); i++) {
+		pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(writer->localPubs,i);
+		memset(dir,0,MAX_ROOTNODE_LENGTH);
+		snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID);
+		etcd_del(dir);
+		pubsubEndpoint_destroy(pubEP);
+	}
+	arrayList_destroy(writer->localPubs);
+
+	celixThreadMutex_unlock(&writer->localPubsLock);
+	celixThreadMutex_destroy(&(writer->localPubsLock));
+
+	free(writer);
 }
 
 celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP, bool storeEP){
@@ -101,11 +101,11 @@ celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_end
 		const char *fwUUID = NULL;
 		bundleContext_getProperty(writer->pubsub_discovery->context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
 		if(fwUUID && strcmp(pubEP->frameworkUUID, fwUUID) == 0) {
-	            celixThreadMutex_lock(&writer->localPubsLock);
-		    pubsub_endpoint_pt p = NULL;
-		    pubsubEndpoint_create(pubEP->frameworkUUID,pubEP->scope,pubEP->topic,pubEP->serviceID,pubEP->endpoint,&p);
-		    arrayList_add(writer->localPubs,p);
-	            celixThreadMutex_unlock(&writer->localPubsLock);
+			celixThreadMutex_lock(&writer->localPubsLock);
+			pubsub_endpoint_pt p = NULL;
+			pubsubEndpoint_clone(pubEP, &p);
+			arrayList_add(writer->localPubs,p);
+			celixThreadMutex_unlock(&writer->localPubsLock);
 		}
 	}
 
@@ -138,52 +138,52 @@ celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_end
 }
 
 celix_status_t etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP) {
-    celix_status_t status = CELIX_SUCCESS;
-    char *key = NULL;
-
-    const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context);
-
-    asprintf(&key, "%s/%s/%s/%s/%ld", rootPath, pubEP->scope, pubEP->topic, pubEP->frameworkUUID, pubEP->serviceID);
-
-    celixThreadMutex_lock(&writer->localPubsLock);
-    for (unsigned int i = 0; i < arrayList_size(writer->localPubs); i++) {
-        pubsub_endpoint_pt ep = arrayList_get(writer->localPubs, i);
-        if (pubsubEndpoint_equals(ep, pubEP)) {
-            arrayList_remove(writer->localPubs, i);
-            pubsubEndpoint_destroy(ep);
-            break;
-        }
-    }
-    celixThreadMutex_unlock(&writer->localPubsLock);
-
-    if (etcd_del(key)) {
-        printf("Failed to remove key %s from ETCD\n",key);
-        status = CELIX_ILLEGAL_ARGUMENT;
-    }
-    FREE_MEM(key);
-    return status;
+	celix_status_t status = CELIX_SUCCESS;
+	char *key = NULL;
+
+	const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context);
+
+	asprintf(&key, "%s/%s/%s/%s/%ld", rootPath, pubEP->scope, pubEP->topic, pubEP->frameworkUUID, pubEP->serviceID);
+
+	celixThreadMutex_lock(&writer->localPubsLock);
+	for (unsigned int i = 0; i < arrayList_size(writer->localPubs); i++) {
+		pubsub_endpoint_pt ep = arrayList_get(writer->localPubs, i);
+		if (pubsubEndpoint_equals(ep, pubEP)) {
+			arrayList_remove(writer->localPubs, i);
+			pubsubEndpoint_destroy(ep);
+			break;
+		}
+	}
+	celixThreadMutex_unlock(&writer->localPubsLock);
+
+	if (etcd_del(key)) {
+		printf("Failed to remove key %s from ETCD\n",key);
+		status = CELIX_ILLEGAL_ARGUMENT;
+	}
+	FREE_MEM(key);
+	return status;
 }
 
 static void* etcdWriter_run(void* data) {
-    etcd_writer_pt writer = (etcd_writer_pt)data;
-    while(writer->running) {
-          celixThreadMutex_lock(&writer->localPubsLock);
-          for(int i=0; i < arrayList_size(writer->localPubs); i++) {
-              etcdWriter_addPublisherEndpoint(writer,(pubsub_endpoint_pt)arrayList_get(writer->localPubs,i),false);
-          }
-          celixThreadMutex_unlock(&writer->localPubsLock);
-          sleep(DEFAULT_ETCD_TTL / 2);
-    }
-
-    return NULL;
+	etcd_writer_pt writer = (etcd_writer_pt)data;
+	while(writer->running) {
+		celixThreadMutex_lock(&writer->localPubsLock);
+		for(int i=0; i < arrayList_size(writer->localPubs); i++) {
+			etcdWriter_addPublisherEndpoint(writer,(pubsub_endpoint_pt)arrayList_get(writer->localPubs,i),false);
+		}
+		celixThreadMutex_unlock(&writer->localPubsLock);
+		sleep(DEFAULT_ETCD_TTL / 2);
+	}
+
+	return NULL;
 }
 
 static const char* etcdWriter_getRootPath(bundle_context_pt context) {
-    const char* rootPath = NULL;
-    bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath);
-    if(rootPath == NULL) {
-        rootPath = DEFAULT_ETCD_ROOTPATH;
-    }
-    return rootPath;
+	const char* rootPath = NULL;
+	bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath);
+	if(rootPath == NULL) {
+		rootPath = DEFAULT_ETCD_ROOTPATH;
+	}
+	return rootPath;
 }
 

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_discovery/private/src/psd_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/src/psd_activator.c b/pubsub/pubsub_discovery/private/src/psd_activator.c
index afbe282..89a517d 100644
--- a/pubsub/pubsub_discovery/private/src/psd_activator.c
+++ b/pubsub/pubsub_discovery/private/src/psd_activator.c
@@ -48,7 +48,7 @@ static celix_status_t createTMPublisherAnnounceTracker(struct activator *activat
 	service_tracker_customizer_pt customizer = NULL;
 
 	status = serviceTrackerCustomizer_create(activator->pubsub_discovery,
-			pubsub_discovery_tmPublisherAnnounceAdding,
+			NULL,
 			pubsub_discovery_tmPublisherAnnounceAdded,
 			pubsub_discovery_tmPublisherAnnounceModified,
 			pubsub_discovery_tmPublisherAnnounceRemoved,

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c b/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
index 0c7d6c4..94a8e11 100644
--- a/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
+++ b/pubsub/pubsub_discovery/private/src/pubsub_discovery_impl.c
@@ -167,7 +167,6 @@ celix_status_t pubsub_discovery_stop(pubsub_discovery_pt ps_discovery) {
     hashMapIterator_destroy(iter);
     hashMap_destroy(ps_discovery->watchers, true, true);
     celixThreadMutex_unlock(&ps_discovery->watchersMutex);
-
     return status;
 }
 
@@ -293,7 +292,7 @@ celix_status_t pubsub_discovery_announcePublisher(void *handle, pubsub_endpoint_
 	}
 	free(pub_key);
 	pubsub_endpoint_pt p = NULL;
-	pubsubEndpoint_create(pubEP->frameworkUUID,pubEP->scope,pubEP->topic,pubEP->serviceID,pubEP->endpoint,&p);
+	pubsubEndpoint_clone(pubEP, &p);
 
 	arrayList_add(pubEP_list,p);
 
@@ -397,16 +396,6 @@ celix_status_t pubsub_discovery_uninterestedInTopic(void *handle, const char* sc
 
 /* pubsub_topology_manager tracker callbacks */
 
-celix_status_t pubsub_discovery_tmPublisherAnnounceAdding(void * handle, service_reference_pt reference, void **service) {
-	celix_status_t status = CELIX_SUCCESS;
-
-	pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt)handle;
-
-	status = bundleContext_getService(pubsub_discovery->context, reference, service);
-
-	return status;
-}
-
 celix_status_t pubsub_discovery_tmPublisherAnnounceAdded(void * handle, service_reference_pt reference, void * service) {
 	celix_status_t status = CELIX_SUCCESS;
 

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h b/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
index 5299808..c36f20e 100644
--- a/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
+++ b/pubsub/pubsub_serializer_json/private/include/pubsub_serializer_impl.h
@@ -24,8 +24,8 @@
  *  \copyright	Apache License, Version 2.0
  */
 
-#ifndef PUBSUB_SERIALIZER_IMPL_H_
-#define PUBSUB_SERIALIZER_IMPL_H_
+#ifndef PUBSUB_SERIALIZER_JSON_H_
+#define PUBSUB_SERIALIZER_JSON_H_
 
 #include "dyn_common.h"
 #include "dyn_type.h"
@@ -34,25 +34,22 @@
 
 #include "pubsub_serializer.h"
 
+#define PUBSUB_SERIALIZER_TYPE	"json"
+
 typedef struct pubsub_serializer {
 	bundle_context_pt bundle_context;
 	log_helper_pt loghelper;
 } pubsub_serializer_t;
 
-typedef struct pubsub_msg_serializer_impl {
-    pubsub_msg_serializer_t msgSerializer;
-    dyn_message_type* dynMsg;
-} pubsub_msg_serializer_impl_t;
-
 celix_status_t pubsubSerializer_create(bundle_context_pt context, pubsub_serializer_t* *serializer);
 celix_status_t pubsubSerializer_destroy(pubsub_serializer_t* serializer);
 
-celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* serializer, bundle_pt bundle, pubsub_msg_serializer_map_t** out);
-celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t*, pubsub_msg_serializer_map_t* map);
+celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* serializer, bundle_pt bundle, hash_map_pt* serializerMap);
+celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t*, hash_map_pt serializerMap);
 
 /* Start of serializer specific functions */
-celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_impl_t* impl, const void* msg, char** out, size_t *outLen);
-celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_impl_t* impl, const char* input, size_t inputLen, void **out);
-void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_impl_t* impl, void *msg);
+celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_t* msgSerializer, const void* msg, void** out, size_t *outLen);
+celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_t* msgSerializer, const void* input, size_t inputLen, void **out);
+void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_t* msgSerializer, void *msg);
 
-#endif /* PUBSUB_SERIALIZER_IMPL_H_ */
+#endif /* PUBSUB_SERIALIZER_JSON_H_ */


Mime
View raw message