celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rlenfer...@apache.org
Subject [3/4] celix git commit: Merge branch 'endpoint-format' into develop
Date Fri, 02 Feb 2018 13:38:10 GMT
http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --cc pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
index 2dcec25,0000000..9929437
mode 100644,000000..100644
--- a/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
@@@ -1,1040 -1,0 +1,1059 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +/*
 + * pubsub_admin_impl.c
 + *
 + *  \date       Sep 30, 2011
 + *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
 + *  \copyright	Apache License, Version 2.0
 + */
 +
 +#include "pubsub_admin_impl.h"
 +#include <zmq.h>
 +
 +#include <stdio.h>
 +#include <stdlib.h>
 +
 +#include <arpa/inet.h>
 +#include <sys/socket.h>
 +#include <netdb.h>
 +
 +#ifndef ANDROID
 +#include <ifaddrs.h>
 +#endif
 +
 +#include <stdio.h>
 +#include <stdlib.h>
 +#include <unistd.h>
 +#include <string.h>
 +
 +#include "constants.h"
 +#include "utils.h"
 +#include "hash_map.h"
 +#include "array_list.h"
 +#include "bundle_context.h"
 +#include "bundle.h"
 +#include "service_reference.h"
 +#include "service_registration.h"
 +#include "log_helper.h"
 +#include "log_service.h"
 +#include "celix_threads.h"
 +#include "service_factory.h"
 +
 +#include "topic_subscription.h"
 +#include "topic_publication.h"
 +#include "pubsub_endpoint.h"
 +#include "pubsub_utils.h"
 +#include "pubsub/subscriber.h"
 +
 +#define MAX_KEY_FOLDER_PATH_LENGTH 512
 +
 +static const char *DEFAULT_IP = "127.0.0.1";
 +
 +static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip);
 +static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
 +static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
 +
 +static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc);
 +static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication);
 +static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication);
 +
 +celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin) {
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +#ifdef BUILD_WITH_ZMQ_SECURITY
 +	if (!zsys_has_curve()){
 +		printf("PSA_ZMQ: zeromq curve unsupported\n");
 +		return CELIX_SERVICE_EXCEPTION;
 +	}
 +#endif
 +
 +	*admin = calloc(1, sizeof(**admin));
 +
 +	if (!*admin) {
 +		status = CELIX_ENOMEM;
 +	}
 +	else{
 +
 +		const char *ip = NULL;
 +		char *detectedIp = NULL;
 +		(*admin)->bundle_context= context;
 +		(*admin)->localPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 +		(*admin)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 +		(*admin)->pendingSubscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 +		(*admin)->externalPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 +		(*admin)->topicSubscriptionsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL);
 +		(*admin)->topicPublicationsPerSerializer  = hashMap_create(NULL, NULL, NULL, NULL);
 +		arrayList_create(&((*admin)->noSerializerSubscriptions));
 +		arrayList_create(&((*admin)->noSerializerPublications));
 +		arrayList_create(&((*admin)->serializerList));
 +
 +		celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL);
 +		celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL);
 +		celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL);
 +		celixThreadMutex_create(&(*admin)->serializerListLock, NULL);
 +		celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL);
 +
 +		celixThreadMutexAttr_create(&(*admin)->noSerializerPendingsAttr);
 +		celixThreadMutexAttr_settype(&(*admin)->noSerializerPendingsAttr, CELIX_THREAD_MUTEX_RECURSIVE);
 +		celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, &(*admin)->noSerializerPendingsAttr);
 +
 +		celixThreadMutexAttr_create(&(*admin)->pendingSubscriptionsAttr);
 +		celixThreadMutexAttr_settype(&(*admin)->pendingSubscriptionsAttr, CELIX_THREAD_MUTEX_RECURSIVE);
 +		celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, &(*admin)->pendingSubscriptionsAttr);
 +
 +		if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) {
 +			logHelper_start((*admin)->loghelper);
 +		}
 +
 +		bundleContext_getProperty(context,PSA_IP , &ip);
 +
 +#ifndef ANDROID
 +		if (ip == NULL) {
 +			const char *interface = NULL;
 +
 +			bundleContext_getProperty(context, PSA_ITF, &interface);
 +			if (pubsubAdmin_getIpAdress(interface, &detectedIp) != CELIX_SUCCESS) {
 +				logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: Could not retrieve IP adress for interface %s", interface);
 +			}
 +
 +			ip = detectedIp;
 +		}
 +#endif
 +
 +		if (ip != NULL) {
 +			logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_ZMQ: Using %s for service annunciation", ip);
 +			(*admin)->ipAddress = strdup(ip);
 +		}
 +		else {
 +			logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: No IP address for service annunciation set. Using %s", DEFAULT_IP);
 +			(*admin)->ipAddress = strdup(DEFAULT_IP);
 +		}
 +
 +		if (detectedIp != NULL) {
 +			free(detectedIp);
 +		}
 +
 +		const char* basePortStr = NULL;
 +		const char* maxPortStr = NULL;
 +		char* endptrBase = NULL;
 +		char* endptrMax = NULL;
 +		bundleContext_getPropertyWithDefault(context, PSA_ZMQ_BASE_PORT, "PSA_ZMQ_DEFAULT_BASE_PORT", &basePortStr);
 +		bundleContext_getPropertyWithDefault(context, PSA_ZMQ_MAX_PORT, "PSA_ZMQ_DEFAULT_MAX_PORT", &maxPortStr);
 +		(*admin)->basePort = strtol(basePortStr, &endptrBase, 10);
 +		(*admin)->maxPort = strtol(maxPortStr, &endptrMax, 10);
 +		if (*endptrBase != '\0') {
 +			(*admin)->basePort = PSA_ZMQ_DEFAULT_BASE_PORT;
 +		}
 +		if (*endptrMax != '\0') {
 +			(*admin)->maxPort = PSA_ZMQ_DEFAULT_MAX_PORT;
 +		}
 +
 +		printf("PSA Using base port %u to max port %u\n", (*admin)->basePort, (*admin)->maxPort);
 +
 +		// Disable Signal Handling by CZMQ
 +		setenv("ZSYS_SIGHANDLER", "false", true);
 +
 +		const char *nrZmqThreads = NULL;
 +		bundleContext_getProperty(context, "PSA_NR_ZMQ_THREADS", &nrZmqThreads);
 +
 +		if(nrZmqThreads != NULL) {
 +			char *endPtr = NULL;
 +			unsigned int nrThreads = strtoul(nrZmqThreads, &endPtr, 10);
 +			if(endPtr != nrZmqThreads && nrThreads > 0 && nrThreads < 50) {
 +				zsys_set_io_threads(nrThreads);
 +				logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_ZMQ: Using %d threads for ZMQ", nrThreads);
 +				printf("PSA_ZMQ: Using %d threads for ZMQ\n", nrThreads);
 +			}
 +		}
 +
 +#ifdef BUILD_WITH_ZMQ_SECURITY
 +		// Setup authenticator
 +		zactor_t* auth = zactor_new (zauth, NULL);
 +		zstr_sendx(auth, "VERBOSE", NULL);
 +
 +		// Load all public keys of subscribers into the application
 +		// This step is done for authenticating subscribers
 +		char curve_folder_path[MAX_KEY_FOLDER_PATH_LENGTH];
 +		char* keys_bundle_dir = pubsub_getKeysBundleDir(context);
 +		snprintf(curve_folder_path, MAX_KEY_FOLDER_PATH_LENGTH, "%s/META-INF/keys/subscriber/public", keys_bundle_dir);
 +		zstr_sendx (auth, "CURVE", curve_folder_path, NULL);
 +		free(keys_bundle_dir);
 +
 +		(*admin)->zmq_auth = auth;
 +#endif
 +
 +	}
 +
 +	return status;
 +}
 +
 +
 +celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
 +{
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	free(admin->ipAddress);
 +
 +	celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
 +	hash_map_iterator_pt iter = hashMapIterator_create(admin->pendingSubscriptions);
 +	while(hashMapIterator_hasNext(iter)){
 +		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
 +		free((char*)hashMapEntry_getKey(entry));
 +		arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
 +	}
 +	hashMapIterator_destroy(iter);
 +	hashMap_destroy(admin->pendingSubscriptions,false,false);
 +	celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
 +
 +	celixThreadMutex_lock(&admin->subscriptionsLock);
 +	hashMap_destroy(admin->subscriptions,false,false);
 +	celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +	celixThreadMutex_lock(&admin->localPublicationsLock);
 +	hashMap_destroy(admin->localPublications,true,false);
 +	celixThreadMutex_unlock(&admin->localPublicationsLock);
 +
 +	celixThreadMutex_lock(&admin->externalPublicationsLock);
 +	iter = hashMapIterator_create(admin->externalPublications);
 +	while(hashMapIterator_hasNext(iter)){
 +		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
 +		free((char*)hashMapEntry_getKey(entry));
 +		arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
 +	}
 +	hashMapIterator_destroy(iter);
 +	hashMap_destroy(admin->externalPublications,false,false);
 +	celixThreadMutex_unlock(&admin->externalPublicationsLock);
 +
 +	celixThreadMutex_lock(&admin->serializerListLock);
 +	arrayList_destroy(admin->serializerList);
 +	celixThreadMutex_unlock(&admin->serializerListLock);
 +
 +	celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +	arrayList_destroy(admin->noSerializerSubscriptions);
 +	arrayList_destroy(admin->noSerializerPublications);
 +	celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +
 +
 +	celixThreadMutex_lock(&admin->usedSerializersLock);
 +
 +	iter = hashMapIterator_create(admin->topicSubscriptionsPerSerializer);
 +	while(hashMapIterator_hasNext(iter)){
 +		arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
 +	}
 +	hashMapIterator_destroy(iter);
 +	hashMap_destroy(admin->topicSubscriptionsPerSerializer,false,false);
 +
 +	iter = hashMapIterator_create(admin->topicPublicationsPerSerializer);
 +	while(hashMapIterator_hasNext(iter)){
 +		arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
 +	}
 +	hashMapIterator_destroy(iter);
 +	hashMap_destroy(admin->topicPublicationsPerSerializer,false,false);
 +
 +	celixThreadMutex_unlock(&admin->usedSerializersLock);
 +
 +	celixThreadMutex_destroy(&admin->usedSerializersLock);
 +	celixThreadMutex_destroy(&admin->serializerListLock);
 +	celixThreadMutex_destroy(&admin->pendingSubscriptionsLock);
 +
 +	celixThreadMutexAttr_destroy(&admin->noSerializerPendingsAttr);
 +	celixThreadMutex_destroy(&admin->noSerializerPendingsLock);
 +
 +	celixThreadMutexAttr_destroy(&admin->pendingSubscriptionsAttr);
 +	celixThreadMutex_destroy(&admin->subscriptionsLock);
 +
 +	celixThreadMutex_destroy(&admin->localPublicationsLock);
 +	celixThreadMutex_destroy(&admin->externalPublicationsLock);
 +
 +	logHelper_stop(admin->loghelper);
 +
 +	logHelper_destroy(&admin->loghelper);
 +
 +#ifdef BUILD_WITH_ZMQ_SECURITY
 +	if (admin->zmq_auth != NULL){
 +		zactor_destroy(&(admin->zmq_auth));
 +	}
 +#endif
 +
 +	free(admin);
 +
 +	return status;
 +}
 +
 +static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	celixThreadMutex_lock(&admin->subscriptionsLock);
 +
 +	topic_subscription_pt any_sub = hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
 +
 +	if(any_sub==NULL){
 +
 +		int i;
 +		pubsub_serializer_service_t *best_serializer = NULL;
 +		if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){
 +			status = pubsub_topicSubscriptionCreate(admin->bundle_context, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, &any_sub);
 +		}
 +		else{
- 			printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",subEP->topic);
++			printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
++				   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +			celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +			arrayList_add(admin->noSerializerSubscriptions,subEP);
 +			celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +		}
 +
 +		if (status == CELIX_SUCCESS){
 +
 +			/* Connect all internal publishers */
 +			celixThreadMutex_lock(&admin->localPublicationsLock);
 +			hash_map_iterator_pt lp_iter =hashMapIterator_create(admin->localPublications);
 +			while(hashMapIterator_hasNext(lp_iter)){
 +				service_factory_pt factory = (service_factory_pt)hashMapIterator_nextValue(lp_iter);
 +				topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle;
 +				array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs);
 +
 +				if(topic_publishers!=NULL){
 +					for(i=0;i<arrayList_size(topic_publishers);i++){
 +						pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
- 						if(pubEP->endpoint !=NULL){
- 							status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
++						if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
++							status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +						}
 +					}
 +					arrayList_destroy(topic_publishers);
 +				}
 +			}
 +			hashMapIterator_destroy(lp_iter);
 +			celixThreadMutex_unlock(&admin->localPublicationsLock);
 +
 +			/* Connect also all external publishers */
 +			celixThreadMutex_lock(&admin->externalPublicationsLock);
 +			hash_map_iterator_pt extp_iter =hashMapIterator_create(admin->externalPublications);
 +			while(hashMapIterator_hasNext(extp_iter)){
 +				array_list_pt ext_pub_list = (array_list_pt)hashMapIterator_nextValue(extp_iter);
 +				if(ext_pub_list!=NULL){
 +					for(i=0;i<arrayList_size(ext_pub_list);i++){
 +						pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
- 						if(pubEP->endpoint !=NULL){
- 							status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
++						if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
++							status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +						}
 +					}
 +				}
 +			}
 +			hashMapIterator_destroy(extp_iter);
 +			celixThreadMutex_unlock(&admin->externalPublicationsLock);
 +
 +
 +			pubsub_topicSubscriptionAddSubscriber(any_sub,subEP);
 +
 +			status += pubsub_topicSubscriptionStart(any_sub);
 +
 +		}
 +
 +		if (status == CELIX_SUCCESS){
 +			hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub);
 +			connectTopicPubSubToSerializer(admin, best_serializer, any_sub, false);
 +		}
 +
 +	}
 +
 +	celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +	return status;
 +}
 +
 +celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
 +	celix_status_t status = CELIX_SUCCESS;
 +
- 	printf("PSA_ZMQ: Received subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope, subEP->topic);
++	printf("PSA_ZMQ: Received subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",
++		   properties_get(subEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
++		   subEP->serviceID,
++		   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
++		   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +
- 	if(strcmp(subEP->topic,PUBSUB_ANY_SUB_TOPIC)==0){
++	if(strcmp(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),PUBSUB_ANY_SUB_TOPIC)==0){
 +		return pubsubAdmin_addAnySubscription(admin,subEP);
 +	}
 +
 +	/* Check if we already know some publisher about this topic, otherwise let's put the subscription in the pending hashmap */
 +	celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
 +	celixThreadMutex_lock(&admin->subscriptionsLock);
 +	celixThreadMutex_lock(&admin->localPublicationsLock);
 +	celixThreadMutex_lock(&admin->externalPublicationsLock);
 +
- 	char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
++	char* scope_topic = createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +
 +	service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
 +	array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
 +
 +	if(factory==NULL && ext_pub_list==NULL){ //No (local or external) publishers yet for this topic
 +		pubsubAdmin_addSubscriptionToPendingList(admin,subEP);
 +	}
 +	else{
 +		int i;
 +		topic_subscription_pt subscription = hashMap_get(admin->subscriptions, scope_topic);
 +
 +		if(subscription == NULL) {
 +			pubsub_serializer_service_t *best_serializer = NULL;
 +			if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){
- 				status += pubsub_topicSubscriptionCreate(admin->bundle_context,subEP->scope, subEP->topic, best_serializer, &subscription);
++				status += pubsub_topicSubscriptionCreate(admin->bundle_context, (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), best_serializer, &subscription);
 +			}
 +			else{
- 				printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",subEP->topic);
++				printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
++					   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +				arrayList_add(admin->noSerializerSubscriptions,subEP);
 +				celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +			}
 +
 +			if (status==CELIX_SUCCESS){
 +
 +				/* Try to connect internal publishers */
 +				if(factory!=NULL){
 +					topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle;
 +					array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs);
 +
 +					if(topic_publishers!=NULL){
 +						for(i=0;i<arrayList_size(topic_publishers);i++){
 +							pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
- 							if(pubEP->endpoint !=NULL){
- 								status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
++							if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
++								status += pubsub_topicSubscriptionConnectPublisher(subscription,(char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +							}
 +						}
 +						arrayList_destroy(topic_publishers);
 +					}
 +
 +				}
 +
 +				/* Look also for external publishers */
 +				if(ext_pub_list!=NULL){
 +					for(i=0;i<arrayList_size(ext_pub_list);i++){
 +						pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
- 						if(pubEP->endpoint !=NULL){
- 							status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
++						if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
++							status += pubsub_topicSubscriptionConnectPublisher(subscription,(char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +						}
 +					}
 +				}
 +
 +				pubsub_topicSubscriptionAddSubscriber(subscription,subEP);
 +
 +				status += pubsub_topicSubscriptionStart(subscription);
 +
 +			}
 +
 +			if(status==CELIX_SUCCESS){
 +
 +				hashMap_put(admin->subscriptions,strdup(scope_topic),subscription);
 +
 +				connectTopicPubSubToSerializer(admin, best_serializer, subscription, false);
 +			}
 +		}
 +
 +		if (status == CELIX_SUCCESS){
 +			pubsub_topicIncreaseNrSubscribers(subscription);
 +		}
 +	}
 +
 +	free(scope_topic);
 +	celixThreadMutex_unlock(&admin->externalPublicationsLock);
 +	celixThreadMutex_unlock(&admin->localPublicationsLock);
 +	celixThreadMutex_unlock(&admin->subscriptionsLock);
 +	celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
 +
 +	return status;
 +
 +}
 +
 +celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
 +	celix_status_t status = CELIX_SUCCESS;
 +
- 	printf("PSA_ZMQ: Removing subscription [FWUUID=%s bundleID=%ld topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->topic);
++	printf("PSA_ZMQ: Removing subscription [FWUUID=%s bundleID=%ld topic=%s]\n",
++		   properties_get(subEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
++		   subEP->serviceID,
++		   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +
- 	char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
++	char* scope_topic = createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +
 +	celixThreadMutex_lock(&admin->subscriptionsLock);
 +	topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
 +	if(sub!=NULL){
 +		pubsub_topicDecreaseNrSubscribers(sub);
 +		if(pubsub_topicGetNrSubscribers(sub) == 0) {
 +			status = pubsub_topicSubscriptionRemoveSubscriber(sub,subEP);
 +		}
 +	}
 +	celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +	if(sub==NULL){
 +		/* Maybe the endpoint was pending */
 +		celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +		if(!arrayList_removeElement(admin->noSerializerSubscriptions, subEP)){
 +			status = CELIX_ILLEGAL_STATE;
 +		}
 +		celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +	}
 +
 +	free(scope_topic);
 +
 +
 +
 +	return status;
 +
 +}
 +
 +celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP) {
 +	celix_status_t status = CELIX_SUCCESS;
 +
- 	printf("PSA_ZMQ: Received publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n", pubEP->frameworkUUID, pubEP->serviceID, pubEP->scope, pubEP->topic);
++	printf("PSA_ZMQ: Received publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",
++		   properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
++		   pubEP->serviceID,
++		   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
++		   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +
 +	const char* fwUUID = NULL;
 +
 +	bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
 +	if (fwUUID == NULL) {
 +		printf("PSA_ZMQ: Cannot retrieve fwUUID.\n");
 +		return CELIX_INVALID_BUNDLE_CONTEXT;
 +	}
 +
- 	char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
++	char *scope_topic = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +
- 	if ((strcmp(pubEP->frameworkUUID, fwUUID) == 0) && (pubEP->endpoint == NULL)) {
++	if ((strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), fwUUID) == 0) &&
++			(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) == NULL)) {
 +
 +		celixThreadMutex_lock(&admin->localPublicationsLock);
 +
 +		service_factory_pt factory = (service_factory_pt) hashMap_get(admin->localPublications, scope_topic);
 +
 +		if (factory == NULL) {
 +			topic_publication_pt pub = NULL;
 +			pubsub_serializer_service_t *best_serializer = NULL;
 +			if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer)) == CELIX_SUCCESS){
 +				status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, best_serializer, admin->ipAddress, admin->basePort, admin->maxPort, &pub);
 +			}
 +			else{
- 				printf("PSA_ZMQ: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n", pubEP->topic);
++				printf("PSA_ZMQ: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n",
++					   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +				arrayList_add(admin->noSerializerPublications,pubEP);
 +				celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +			}
 +
 +			if (status == CELIX_SUCCESS) {
 +				status = pubsub_topicPublicationStart(admin->bundle_context, pub, &factory);
 +				if (status == CELIX_SUCCESS && factory != NULL) {
 +					hashMap_put(admin->localPublications, strdup(scope_topic), factory);
 +					connectTopicPubSubToSerializer(admin, best_serializer, pub, true);
 +				}
 +			} else {
- 				printf("PSA_ZMQ: Cannot create a topicPublication for scope=%s, topic=%s (bundle %ld).\n", pubEP->scope, pubEP->topic, pubEP->serviceID);
++				printf("PSA_ZMQ: Cannot create a topicPublication for scope=%s, topic=%s (bundle %ld).\n",
++					   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
++					   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
++					   pubEP->serviceID);
 +			}
 +		} else {
 +			//just add the new EP to the list
 +			topic_publication_pt pub = (topic_publication_pt) factory->handle;
 +			pubsub_topicPublicationAddPublisherEP(pub, pubEP);
 +		}
 +
 +		celixThreadMutex_unlock(&admin->localPublicationsLock);
 +	}
 +	else{
 +
 +		celixThreadMutex_lock(&admin->externalPublicationsLock);
 +		array_list_pt ext_pub_list = (array_list_pt) hashMap_get(admin->externalPublications, scope_topic);
 +		if (ext_pub_list == NULL) {
 +			arrayList_create(&ext_pub_list);
 +			hashMap_put(admin->externalPublications, strdup(scope_topic), ext_pub_list);
 +		}
 +
 +		arrayList_add(ext_pub_list, pubEP);
 +
 +		celixThreadMutex_unlock(&admin->externalPublicationsLock);
 +	}
 +
 +	/* Re-evaluate the pending subscriptions */
 +	celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
 +
 +	hash_map_entry_pt pendingSub = hashMap_getEntry(admin->pendingSubscriptions, scope_topic);
 +	if (pendingSub != NULL) { //There were pending subscription for the just published topic. Let's connect them.
 +		char* topic = (char*) hashMapEntry_getKey(pendingSub);
 +		array_list_pt pendingSubList = (array_list_pt) hashMapEntry_getValue(pendingSub);
 +		int i;
 +		for (i = 0; i < arrayList_size(pendingSubList); i++) {
 +			pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) arrayList_get(pendingSubList, i);
 +			pubsubAdmin_addSubscription(admin, subEP);
 +		}
 +		hashMap_remove(admin->pendingSubscriptions, scope_topic);
 +		arrayList_clear(pendingSubList);
 +		arrayList_destroy(pendingSubList);
 +		free(topic);
 +	}
 +
 +	celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
 +
 +	/* Connect the new publisher to the subscription for his topic, if there is any */
 +	celixThreadMutex_lock(&admin->subscriptionsLock);
 +
 +	topic_subscription_pt sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, scope_topic);
- 	if (sub != NULL && pubEP->endpoint != NULL) {
- 		pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, pubEP->endpoint);
++	if (sub != NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) != NULL) {
++		pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, (char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +	}
 +
 +	/* And check also for ANY subscription */
 +	topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
- 	if (any_sub != NULL && pubEP->endpoint != NULL) {
- 		pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, pubEP->endpoint);
++	if (any_sub != NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) != NULL) {
++		pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, (char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +	}
 +
 +	free(scope_topic);
 +
 +	celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +	return status;
 +
 +}
 +
 +celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){
 +	celix_status_t status = CELIX_SUCCESS;
 +	int count = 0;
 +
- 	printf("PSA_ZMQ: Removing publication [FWUUID=%s bundleID=%ld topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->topic);
++	printf("PSA_ZMQ: Removing publication [FWUUID=%s bundleID=%ld topic=%s]\n",
++		   properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
++		   pubEP->serviceID,
++		   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +
 +	const char* fwUUID = NULL;
 +
 +	bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
 +	if(fwUUID==NULL){
 +		printf("PSA_ZMQ: Cannot retrieve fwUUID.\n");
 +		return CELIX_INVALID_BUNDLE_CONTEXT;
 +	}
- 	char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
++	char *scope_topic = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +
- 	if(strcmp(pubEP->frameworkUUID,fwUUID)==0){
++	if(strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0){
 +
 +		celixThreadMutex_lock(&admin->localPublicationsLock);
 +		service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
 +		if(factory!=NULL){
 +			topic_publication_pt pub = (topic_publication_pt)factory->handle;
 +			pubsub_topicPublicationRemovePublisherEP(pub,pubEP);
 +		}
 +		celixThreadMutex_unlock(&admin->localPublicationsLock);
 +
 +		if(factory==NULL){
 +			/* Maybe the endpoint was pending */
 +			celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +			if(!arrayList_removeElement(admin->noSerializerPublications, pubEP)){
 +				status = CELIX_ILLEGAL_STATE;
 +			}
 +			celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +		}
 +	}
 +	else{
 +
 +		celixThreadMutex_lock(&admin->externalPublicationsLock);
 +		array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
 +		if(ext_pub_list!=NULL){
 +			int i;
 +			bool found = false;
 +			for(i=0;!found && i<arrayList_size(ext_pub_list);i++){
 +				pubsub_endpoint_pt p  = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
 +				found = pubsubEndpoint_equals(pubEP,p);
 +				if (found){
 +					arrayList_remove(ext_pub_list,i);
 +				}
 +			}
 +			// Check if there are more publishers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic)
 +			for(i=0; i<arrayList_size(ext_pub_list);i++) {
 +				pubsub_endpoint_pt p  = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
- 				if (strcmp(pubEP->endpoint,p->endpoint) == 0) {
++				if (strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL),properties_get(p->endpoint_props, PUBSUB_ENDPOINT_URL)) == 0) {
 +					count++;
 +				}
 +			}
 +
 +			if(arrayList_size(ext_pub_list)==0){
 +				hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic);
 +				char* topic = (char*)hashMapEntry_getKey(entry);
 +				array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry);
 +				hashMap_remove(admin->externalPublications,topic);
 +				arrayList_destroy(list);
 +				free(topic);
 +			}
 +		}
 +
 +		celixThreadMutex_unlock(&admin->externalPublicationsLock);
 +	}
 +
 +	/* Check if this publisher was connected to one of our subscribers*/
 +	celixThreadMutex_lock(&admin->subscriptionsLock);
 +
 +	topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
- 	if(sub!=NULL && pubEP->endpoint!=NULL && count == 0){
- 		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint);
++	if(sub!=NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL && count == 0){
++		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,(char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +	}
 +
 +	/* And check also for ANY subscription */
 +	topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
- 	if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){
- 		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,pubEP->endpoint);
++	if(any_sub!=NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL && count == 0){
++		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,(char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +	}
 +
 +	free(scope_topic);
 +	celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +	return status;
 +
 +}
 +
 +celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin, char *scope, char* topic){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	printf("PSA_ZMQ: Closing all publications\n");
 +
 +	celixThreadMutex_lock(&admin->localPublicationsLock);
 +	char *scope_topic = createScopeTopicKey(scope, topic);
 +	hash_map_entry_pt pubsvc_entry = (hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic);
 +	if(pubsvc_entry!=NULL){
 +		char* key = (char*)hashMapEntry_getKey(pubsvc_entry);
 +		service_factory_pt factory= (service_factory_pt)hashMapEntry_getValue(pubsvc_entry);
 +		topic_publication_pt pub = (topic_publication_pt)factory->handle;
 +		status += pubsub_topicPublicationStop(pub);
 +		disconnectTopicPubSubFromSerializer(admin, pub, true);
 +		status += pubsub_topicPublicationDestroy(pub);
 +		hashMap_remove(admin->localPublications,scope_topic);
 +		free(key);
 +		free(factory);
 +	}
 +	free(scope_topic);
 +	celixThreadMutex_unlock(&admin->localPublicationsLock);
 +
 +	return status;
 +
 +}
 +
 +celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope,char* topic){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	printf("PSA_ZMQ: Closing all subscriptions\n");
 +
 +	celixThreadMutex_lock(&admin->subscriptionsLock);
 +	char *scope_topic = createScopeTopicKey(scope, topic);
 +	hash_map_entry_pt sub_entry = (hash_map_entry_pt)hashMap_getEntry(admin->subscriptions,scope_topic);
 +	if(sub_entry!=NULL){
 +		char* topic = (char*)hashMapEntry_getKey(sub_entry);
 +
 +		topic_subscription_pt ts = (topic_subscription_pt)hashMapEntry_getValue(sub_entry);
 +		status += pubsub_topicSubscriptionStop(ts);
 +		disconnectTopicPubSubFromSerializer(admin, ts, false);
 +		status += pubsub_topicSubscriptionDestroy(ts);
 +		hashMap_remove(admin->subscriptions,scope_topic);
 +		free(topic);
 +
 +	}
 +	free(scope_topic);
 +	celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +	return status;
 +
 +}
 +
 +
 +#ifndef ANDROID
 +static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip) {
 +	celix_status_t status = CELIX_BUNDLE_EXCEPTION;
 +
 +	struct ifaddrs *ifaddr, *ifa;
 +	char host[NI_MAXHOST];
 +
 +	if (getifaddrs(&ifaddr) != -1)
 +	{
 +		for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = ifa->ifa_next)
 +		{
 +			if (ifa->ifa_addr == NULL)
 +				continue;
 +
 +			if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) {
 +				if (interface == NULL) {
 +					*ip = strdup(host);
 +					status = CELIX_SUCCESS;
 +				}
 +				else if (strcmp(ifa->ifa_name, interface) == 0) {
 +					*ip = strdup(host);
 +					status = CELIX_SUCCESS;
 +				}
 +			}
 +		}
 +
 +		freeifaddrs(ifaddr);
 +	}
 +
 +	return status;
 +}
 +#endif
 +
 +static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
 +	celix_status_t status = CELIX_SUCCESS;
- 	char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
++	char* scope_topic = createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +	array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic);
 +	if(pendingListPerTopic==NULL){
 +		arrayList_create(&pendingListPerTopic);
 +		hashMap_put(admin->pendingSubscriptions,strdup(scope_topic),pendingListPerTopic);
 +	}
 +	arrayList_add(pendingListPerTopic,subEP);
 +	free(scope_topic);
 +	return status;
 +}
 +
 +celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt reference, void * service){
 +	/* Assumption: serializers are all available at startup.
 +	 * If a new (possibly better) serializer is installed and started, already created topic_publications/subscriptions will not be destroyed and recreated */
 +
 +	celix_status_t status = CELIX_SUCCESS;
 +	int i=0;
 +
 +	const char *serType = NULL;
 +	serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
 +	if(serType == NULL){
 +		printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference);
 +		return CELIX_SERVICE_EXCEPTION;
 +	}
 +
 +	pubsub_admin_pt admin = (pubsub_admin_pt)handle;
 +	celixThreadMutex_lock(&admin->serializerListLock);
 +	arrayList_add(admin->serializerList, reference);
 +	celixThreadMutex_unlock(&admin->serializerListLock);
 +
 +	/* Now let's re-evaluate the pending */
 +	celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +
 +	for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){
 +		pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i);
 +		pubsub_serializer_service_t *best_serializer = NULL;
 +		pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
 +		if(best_serializer != NULL){ /* Finally we have a valid serializer! */
 +			pubsubAdmin_addSubscription(admin, ep);
 +		}
 +	}
 +
 +	for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){
 +		pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i);
 +		pubsub_serializer_service_t *best_serializer = NULL;
 +		pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
 +		if(best_serializer != NULL){ /* Finally we have a valid serializer! */
 +			pubsubAdmin_addPublication(admin, ep);
 +		}
 +	}
 +
 +	celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +
 +	printf("PSA_ZMQ: %s serializer added\n",serType);
 +
 +	return status;
 +}
 +
 +celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service){
 +
 +	pubsub_admin_pt admin = (pubsub_admin_pt)handle;
 +	int i=0, j=0;
 +	const char *serType = NULL;
 +
 +	serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
 +	if(serType == NULL){
 +		printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference);
 +		return CELIX_SERVICE_EXCEPTION;
 +	}
 +
 +	celixThreadMutex_lock(&admin->serializerListLock);
 +	/* Remove the serializer from the list */
 +	arrayList_removeElement(admin->serializerList, reference);
 +	celixThreadMutex_unlock(&admin->serializerListLock);
 +
 +
 +	celixThreadMutex_lock(&admin->usedSerializersLock);
 +	array_list_pt topicPubList = (array_list_pt)hashMap_remove(admin->topicPublicationsPerSerializer, service);
 +	array_list_pt topicSubList = (array_list_pt)hashMap_remove(admin->topicSubscriptionsPerSerializer, service);
 +	celixThreadMutex_unlock(&admin->usedSerializersLock);
 +
 +	/* Now destroy the topicPublications, but first put back the pubsub_endpoints back to the noSerializer pending list */
 +	if(topicPubList!=NULL){
 +		for(i=0;i<arrayList_size(topicPubList);i++){
 +			topic_publication_pt topicPub = (topic_publication_pt)arrayList_get(topicPubList,i);
 +			/* Stop the topic publication */
 +			pubsub_topicPublicationStop(topicPub);
 +			/* Get the endpoints that are going to be orphan */
 +			array_list_pt pubList = pubsub_topicPublicationGetPublisherList(topicPub);
 +			for(j=0;j<arrayList_size(pubList);j++){
 +				pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubList,j);
 +				/* Remove the publication */
 +				pubsubAdmin_removePublication(admin, pubEP);
 +				/* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */
- 				if(pubEP->endpoint!=NULL){
- 					free(pubEP->endpoint);
- 					pubEP->endpoint = NULL;
++				if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL){
++					properties_unset(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL);
 +				}
 +				/* Add the orphan endpoint to the noSerializer pending list */
 +				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +				arrayList_add(admin->noSerializerPublications,pubEP);
 +				celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +			}
 +			arrayList_destroy(pubList);
 +
 +			/* Cleanup also the localPublications hashmap*/
 +			celixThreadMutex_lock(&admin->localPublicationsLock);
 +			hash_map_iterator_pt iter = hashMapIterator_create(admin->localPublications);
 +			char *key = NULL;
 +			service_factory_pt factory = NULL;
 +			while(hashMapIterator_hasNext(iter)){
 +				hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
 +				factory = (service_factory_pt)hashMapEntry_getValue(entry);
 +				topic_publication_pt pub = (topic_publication_pt)factory->handle;
 +				if(pub==topicPub){
 +					key = (char*)hashMapEntry_getKey(entry);
 +					break;
 +				}
 +			}
 +			hashMapIterator_destroy(iter);
 +			if(key!=NULL){
 +				hashMap_remove(admin->localPublications, key);
 +				free(factory);
 +				free(key);
 +			}
 +			celixThreadMutex_unlock(&admin->localPublicationsLock);
 +
 +			/* Finally destroy the topicPublication */
 +			pubsub_topicPublicationDestroy(topicPub);
 +		}
 +		arrayList_destroy(topicPubList);
 +	}
 +
 +	/* Now destroy the topicSubscriptions, but first put back the pubsub_endpoints back to the noSerializer pending list */
 +	if(topicSubList!=NULL){
 +		for(i=0;i<arrayList_size(topicSubList);i++){
 +			topic_subscription_pt topicSub = (topic_subscription_pt)arrayList_get(topicSubList,i);
 +			/* Stop the topic subscription */
 +			pubsub_topicSubscriptionStop(topicSub);
 +			/* Get the endpoints that are going to be orphan */
 +			array_list_pt subList = pubsub_topicSubscriptionGetSubscribersList(topicSub);
 +			for(j=0;j<arrayList_size(subList);j++){
 +				pubsub_endpoint_pt subEP = (pubsub_endpoint_pt)arrayList_get(subList,j);
 +				/* Remove the subscription */
 +				pubsubAdmin_removeSubscription(admin, subEP);
 +				/* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */
- 				if(subEP->endpoint!=NULL){
- 					free(subEP->endpoint);
- 					subEP->endpoint = NULL;
++				if(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL){
++					properties_unset(subEP->endpoint_props, PUBSUB_ENDPOINT_URL);
 +				}
 +				/* Add the orphan endpoint to the noSerializer pending list */
 +				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +				arrayList_add(admin->noSerializerSubscriptions,subEP);
 +				celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +			}
 +
 +			/* Cleanup also the subscriptions hashmap*/
 +			celixThreadMutex_lock(&admin->subscriptionsLock);
 +			hash_map_iterator_pt iter = hashMapIterator_create(admin->subscriptions);
 +			char *key = NULL;
 +			while(hashMapIterator_hasNext(iter)){
 +				hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
 +				topic_subscription_pt sub = (topic_subscription_pt)hashMapEntry_getValue(entry);
 +				if(sub==topicSub){
 +					key = (char*)hashMapEntry_getKey(entry);
 +					break;
 +				}
 +			}
 +			hashMapIterator_destroy(iter);
 +			if(key!=NULL){
 +				hashMap_remove(admin->subscriptions, key);
 +				free(key);
 +			}
 +			celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +			/* Finally destroy the topicSubscription */
 +			pubsub_topicSubscriptionDestroy(topicSub);
 +		}
 +		arrayList_destroy(topicSubList);
 +	}
 +
 +
 +
 +	printf("PSA_ZMQ: %s serializer removed\n",serType);
 +
 +
 +	return CELIX_SUCCESS;
 +}
 +
 +celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	celixThreadMutex_lock(&admin->serializerListLock);
 +	status = pubsub_admin_match(endpoint->topic_props,PUBSUB_ADMIN_TYPE,admin->serializerList,score);
 +	celixThreadMutex_unlock(&admin->serializerListLock);
 +
 +	return status;
 +}
 +
 +/* This one recall the same logic as in the match function */
 +static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc){
 +
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	celixThreadMutex_lock(&admin->serializerListLock);
 +	status = pubsub_admin_get_best_serializer(ep->topic_props, admin->serializerList, serSvc);
 +	celixThreadMutex_unlock(&admin->serializerListLock);
 +
 +	return status;
 +
 +}
 +
 +static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication){
 +
 +	celixThreadMutex_lock(&admin->usedSerializersLock);
 +
 +	hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
 +	array_list_pt list = (array_list_pt)hashMap_get(map,serializer);
 +	if(list==NULL){
 +		arrayList_create(&list);
 +		hashMap_put(map,serializer,list);
 +	}
 +	arrayList_add(list,topicPubSub);
 +
 +	celixThreadMutex_unlock(&admin->usedSerializersLock);
 +
 +}
 +
 +static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication){
 +
 +	celixThreadMutex_lock(&admin->usedSerializersLock);
 +
 +	hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
 +	hash_map_iterator_pt iter = hashMapIterator_create(map);
 +	while(hashMapIterator_hasNext(iter)){
 +		array_list_pt list = (array_list_pt)hashMapIterator_nextValue(iter);
 +		if(arrayList_removeElement(list, topicPubSub)){ //Found it!
 +			break;
 +		}
 +	}
 +	hashMapIterator_destroy(iter);
 +
 +	celixThreadMutex_unlock(&admin->usedSerializersLock);
 +
 +}

http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_admin_zmq/src/topic_publication.c
----------------------------------------------------------------------
diff --cc pubsub/pubsub_admin_zmq/src/topic_publication.c
index b612605,0000000..873cec2
mode 100644,000000..100644
--- a/pubsub/pubsub_admin_zmq/src/topic_publication.c
+++ b/pubsub/pubsub_admin_zmq/src/topic_publication.c
@@@ -1,630 -1,0 +1,631 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +
 +#include <czmq.h>
 +/* The following undefs prevent the collision between:
 + * - sys/syslog.h (which is included within czmq)
 + * - celix/dfi/dfi_log_util.h
 + */
 +#undef LOG_DEBUG
 +#undef LOG_WARNING
 +#undef LOG_INFO
 +#undef LOG_WARNING
 +
 +#include <stdlib.h>
 +#include <string.h>
 +#include <unistd.h>
 +
 +#include "array_list.h"
 +#include "celixbool.h"
 +#include "service_registration.h"
 +#include "utils.h"
 +#include "service_factory.h"
 +#include "version.h"
 +
 +#include "pubsub_common.h"
 +#include "pubsub_utils.h"
 +#include "pubsub/publisher.h"
 +
 +#include "topic_publication.h"
 +
 +#include "pubsub_serializer.h"
 +
 +#ifdef BUILD_WITH_ZMQ_SECURITY
 +	#include "zmq_crypto.h"
 +
 +	#define MAX_CERT_PATH_LENGTH 512
 +#endif
 +
 +#define EP_ADDRESS_LEN		32
 +#define ZMQ_BIND_MAX_RETRY	5
 +
 +#define FIRST_SEND_DELAY	2
 +
 +struct topic_publication {
 +	zsock_t* zmq_socket;
 +	celix_thread_mutex_t socket_lock; //Protects zmq_socket access
 +	zcert_t * zmq_cert;
 +	char* endpoint;
 +	service_registration_pt svcFactoryReg;
 +	array_list_pt pub_ep_list; //List<pubsub_endpoint>
 +	hash_map_pt boundServices; //<bundle_pt,bound_service>
 +	pubsub_serializer_service_t *serializer;
 +	celix_thread_mutex_t tp_lock;
 +};
 +
 +typedef struct publish_bundle_bound_service {
 +	topic_publication_pt parent;
 +	pubsub_publisher_t service;
 +	bundle_pt bundle;
 +	char *topic;
 +	hash_map_pt msgTypes;
 +	unsigned short getCount;
 +	celix_thread_mutex_t mp_lock; //Protects publish_bundle_bound_service data structure
 +	bool mp_send_in_progress;
 +	array_list_pt mp_parts;
 +}* publish_bundle_bound_service_pt;
 +
 +/* Note: correct locking order is
 + * 1. tp_lock
 + * 2. mp_lock
 + * 3. socket_lock
 + *
 + * tp_lock and socket_lock are independent.
 + */
 +
 +typedef struct pubsub_msg{
 +	pubsub_msg_header_pt header;
 +	char* payload;
 +	int payloadSize;
 +}* pubsub_msg_pt;
 +
 +static unsigned int rand_range(unsigned int min, unsigned int max);
 +
 +static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
 +static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
 +
 +static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle);
 +static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc);
 +
 +static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, const void *msg);
 +static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *inMsg, int flags);
 +static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId);
 +
 +static void delay_first_send_for_late_joiners(void);
 +
 +celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +#ifdef BUILD_WITH_ZMQ_SECURITY
 +	char* secure_topics = NULL;
 +	bundleContext_getProperty(bundle_context, "SECURE_TOPICS", (const char **) &secure_topics);
 +
 +	if (secure_topics){
 +		array_list_pt secure_topics_list = pubsub_getTopicsFromString(secure_topics);
 +
 +		int i;
 +		int secure_topics_size = arrayList_size(secure_topics_list);
 +		for (i = 0; i < secure_topics_size; i++){
 +			char* top = arrayList_get(secure_topics_list, i);
 +			if (strcmp(pubEP->topic, top) == 0){
 +				printf("PSA_ZMQ_TP: Secure topic: '%s'\n", top);
 +				pubEP->is_secure = true;
 +			}
 +			free(top);
 +			top = NULL;
 +		}
 +
 +		arrayList_destroy(secure_topics_list);
 +	}
 +
 +	zcert_t* pub_cert = NULL;
 +	if (pubEP->is_secure){
 +		char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
 +		if (keys_bundle_dir == NULL){
 +			return CELIX_SERVICE_EXCEPTION;
 +		}
 +
 +		const char* keys_file_path = NULL;
 +		const char* keys_file_name = NULL;
 +		bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, &keys_file_path);
 +		bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, &keys_file_name);
 +
 +		char cert_path[MAX_CERT_PATH_LENGTH];
 +
 +		//certificate path ".cache/bundle{id}/version0.0/./META-INF/keys/publisher/private/pub_{topic}.key"
 +		snprintf(cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/publisher/private/pub_%s.key.enc", keys_bundle_dir, pubEP->topic);
 +		free(keys_bundle_dir);
 +		printf("PSA_ZMQ_TP: Loading key '%s'\n", cert_path);
 +
 +		pub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, cert_path);
 +		if (pub_cert == NULL){
 +			printf("PSA_ZMQ_TP: Cannot load key '%s'\n", cert_path);
 +			printf("PSA_ZMQ_TP: Topic '%s' NOT SECURED !\n", pubEP->topic);
 +			pubEP->is_secure = false;
 +		}
 +	}
 +#endif
 +
 +	zsock_t* socket = zsock_new (ZMQ_PUB);
 +	if(socket==NULL){
 +		#ifdef BUILD_WITH_ZMQ_SECURITY
 +			if (pubEP->is_secure){
 +				zcert_destroy(&pub_cert);
 +			}
 +		#endif
 +
 +        perror("Error for zmq_socket");
 +		return CELIX_SERVICE_EXCEPTION;
 +	}
 +#ifdef BUILD_WITH_ZMQ_SECURITY
 +	if (pubEP->is_secure){
 +		zcert_apply (pub_cert, socket); // apply certificate to socket
 +		zsock_set_curve_server (socket, true); // setup the publisher's socket to use the curve functions
 +	}
 +#endif
 +
 +	int rv = -1, retry=0;
 +	char* ep = malloc(EP_ADDRESS_LEN);
 +    char bindAddress[EP_ADDRESS_LEN];
 +
 +	while(rv==-1 && retry<ZMQ_BIND_MAX_RETRY){
 +		/* Randomized part due to same bundle publishing on different topics */
 +		unsigned int port = rand_range(basePort,maxPort);
 +		memset(ep,0,EP_ADDRESS_LEN);
 +        memset(bindAddress, 0, EP_ADDRESS_LEN);
 +
 +		snprintf(ep,EP_ADDRESS_LEN,"tcp://%s:%u",bindIP,port);
-         snprintf(bindAddress, EP_ADDRESS_LEN, "tcp://0.0.0.0:%u", port); //NOTE using a different bind addres than endpoint address
++        snprintf(bindAddress, EP_ADDRESS_LEN, "tcp://0.0.0.0:%u", port); //NOTE using a different bind address than endpoint address
 +		rv = zsock_bind (socket, "%s", bindAddress);
 +        if (rv == -1) {
 +            perror("Error for zmq_bind");
 +        }
 +		retry++;
 +	}
 +
 +	if(rv == -1){
 +		free(ep);
 +		return CELIX_SERVICE_EXCEPTION;
 +	}
 +
 +	/* ZMQ stuffs are all fine at this point. Let's create and initialize the structure */
 +
 +	topic_publication_pt pub = calloc(1,sizeof(*pub));
 +
 +	arrayList_create(&(pub->pub_ep_list));
 +	pub->boundServices = hashMap_create(NULL,NULL,NULL,NULL);
 +	celixThreadMutex_create(&(pub->tp_lock),NULL);
 +
 +	pub->endpoint = ep;
 +	pub->zmq_socket = socket;
 +	pub->serializer = best_serializer;
 +
 +	celixThreadMutex_create(&(pub->socket_lock),NULL);
 +
 +#ifdef BUILD_WITH_ZMQ_SECURITY
 +	if (pubEP->is_secure){
 +		pub->zmq_cert = pub_cert;
 +	}
 +#endif
 +
 +	pubsub_topicPublicationAddPublisherEP(pub,pubEP);
 +
 +	*out = pub;
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	celixThreadMutex_lock(&(pub->tp_lock));
 +
 +	free(pub->endpoint);
 +	arrayList_destroy(pub->pub_ep_list);
 +
 +	hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
 +	while(hashMapIterator_hasNext(iter)){
 +		publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter);
 +		pubsub_destroyPublishBundleBoundService(bound);
 +	}
 +	hashMapIterator_destroy(iter);
 +	hashMap_destroy(pub->boundServices,false,false);
 +
 +	pub->svcFactoryReg = NULL;
 +	pub->serializer = NULL;
 +#ifdef BUILD_WITH_ZMQ_SECURITY
 +	zcert_destroy(&(pub->zmq_cert));
 +#endif
 +
 +	celixThreadMutex_unlock(&(pub->tp_lock));
 +
 +	celixThreadMutex_destroy(&(pub->tp_lock));
 +
 +	celixThreadMutex_lock(&(pub->socket_lock));
 +	zsock_destroy(&(pub->zmq_socket));
 +	celixThreadMutex_unlock(&(pub->socket_lock));
 +
 +	celixThreadMutex_destroy(&(pub->socket_lock));
 +
 +	free(pub);
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	/* Let's register the new service */
 +
 +	pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0);
 +
 +	if(pubEP!=NULL){
 +		service_factory_pt factory = calloc(1, sizeof(*factory));
 +		factory->handle = pub;
 +		factory->getService = pubsub_topicPublicationGetService;
 +		factory->ungetService = pubsub_topicPublicationUngetService;
 +
 +		properties_pt props = properties_create();
- 		properties_set(props,PUBSUB_PUBLISHER_TOPIC,pubEP->topic);
- 		properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope);
++		properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
++		properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE));
 +		properties_set(props,"service.version", PUBSUB_PUBLISHER_SERVICE_VERSION);
 +
 +		status = bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg));
 +
 +		if(status != CELIX_SUCCESS){
 +			properties_destroy(props);
- 			printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot register ServiceFactory for topic %s (bundle %ld).\n",pubEP->topic,pubEP->serviceID);
++			printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot register ServiceFactory for topic %s (bundle %ld).\n",
++				   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),pubEP->serviceID);
 +		}
 +		else{
 +			*svcFactory = factory;
 +		}
 +	}
 +	else{
 +		printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot find pubsub_endpoint after adding it...Should never happen!\n");
 +		status = CELIX_SERVICE_EXCEPTION;
 +	}
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
 +	return serviceRegistration_unregister(pub->svcFactoryReg);
 +}
 +
 +celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){
 +
 +	celixThreadMutex_lock(&(pub->tp_lock));
- 	ep->endpoint = strdup(pub->endpoint);
++    pubsubEndpoint_setField(ep, PUBSUB_ENDPOINT_URL, pub->endpoint);
 +	arrayList_add(pub->pub_ep_list,ep);
 +	celixThreadMutex_unlock(&(pub->tp_lock));
 +
 +	return CELIX_SUCCESS;
 +}
 +
 +celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){
 +
 +	celixThreadMutex_lock(&(pub->tp_lock));
 +	for (int i = 0; i < arrayList_size(pub->pub_ep_list); i++) {
 +	        pubsub_endpoint_pt e = arrayList_get(pub->pub_ep_list, i);
 +	        if(pubsubEndpoint_equals(ep, e)) {
 +	            arrayList_removeElement(pub->pub_ep_list,ep);
 +	            break;
 +	        }
 +	}
 +	celixThreadMutex_unlock(&(pub->tp_lock));
 +
 +	return CELIX_SUCCESS;
 +}
 +
 +array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){
 +	array_list_pt list = NULL;
 +	celixThreadMutex_lock(&(pub->tp_lock));
 +	list = arrayList_clone(pub->pub_ep_list);
 +	celixThreadMutex_unlock(&(pub->tp_lock));
 +	return list;
 +}
 +
 +
 +static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) {
 +	celix_status_t  status = CELIX_SUCCESS;
 +
 +	topic_publication_pt publish = (topic_publication_pt)handle;
 +
 +	celixThreadMutex_lock(&(publish->tp_lock));
 +
 +	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
 +	if(bound==NULL){
 +		bound = pubsub_createPublishBundleBoundService(publish,bundle);
 +		if(bound!=NULL){
 +			hashMap_put(publish->boundServices,bundle,bound);
 +		}
 +	}
 +	else{
 +		bound->getCount++;
 +	}
 +
 +	*service = &bound->service;
 +
 +	celixThreadMutex_unlock(&(publish->tp_lock));
 +
 +	return status;
 +}
 +
 +static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service)  {
 +
 +	topic_publication_pt publish = (topic_publication_pt)handle;
 +
 +	celixThreadMutex_lock(&(publish->tp_lock));
 +
 +	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
 +	if(bound!=NULL){
 +
 +		bound->getCount--;
 +		if(bound->getCount==0){
 +			pubsub_destroyPublishBundleBoundService(bound);
 +			hashMap_remove(publish->boundServices,bundle);
 +		}
 +
 +	}
 +	else{
 +		long bundleId = -1;
 +		bundle_getBundleId(bundle,&bundleId);
 +		printf("PSA_ZMQ_TP: Unexpected ungetService call for bundle %ld.\n", bundleId);
 +	}
 +
 +	/* service should be never used for unget, so let's set the pointer to NULL */
 +	*service = NULL;
 +
 +	celixThreadMutex_unlock(&(publish->tp_lock));
 +
 +	return CELIX_SUCCESS;
 +}
 +
 +static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_pt msg, bool last){
 +
 +	bool ret = true;
 +
 +	zframe_t* headerMsg = zframe_new(msg->header, sizeof(struct pubsub_msg_header));
 +	if (headerMsg == NULL) ret=false;
 +	zframe_t* payloadMsg = zframe_new(msg->payload, msg->payloadSize);
 +	if (payloadMsg == NULL) ret=false;
 +
 +	delay_first_send_for_late_joiners();
 +
 +	if( zframe_send(&headerMsg,zmq_socket, ZFRAME_MORE) == -1) ret=false;
 +
 +	if(!last){
 +		if( zframe_send(&payloadMsg,zmq_socket, ZFRAME_MORE) == -1) ret=false;
 +	}
 +	else{
 +		if( zframe_send(&payloadMsg,zmq_socket, 0) == -1) ret=false;
 +	}
 +
 +	if (!ret){
 +		zframe_destroy(&headerMsg);
 +		zframe_destroy(&payloadMsg);
 +	}
 +
 +	free(msg->header);
 +	free(msg->payload);
 +	free(msg);
 +
 +	return ret;
 +
 +}
 +
 +static bool send_pubsub_mp_msg(zsock_t* zmq_socket, array_list_pt mp_msg_parts){
 +
 +	bool ret = true;
 +
 +	unsigned int i = 0;
 +	unsigned int mp_num = arrayList_size(mp_msg_parts);
 +	for(;i<mp_num;i++){
 +		ret = ret && send_pubsub_msg(zmq_socket, (pubsub_msg_pt)arrayList_get(mp_msg_parts,i), (i==mp_num-1));
 +	}
 +	arrayList_clear(mp_msg_parts);
 +
 +	return ret;
 +
 +}
 +
 +static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg) {
 +
 +	return pubsub_topicPublicationSendMultipart(handle,msgTypeId,msg, PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG);
 +
 +}
 +
 +static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *inMsg, int flags){
 +
 +	int status = 0;
 +
 +	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle;
 +
 +	celixThreadMutex_lock(&(bound->parent->tp_lock));
 +	celixThreadMutex_lock(&(bound->mp_lock));
 +	if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & PUBSUB_PUBLISHER_LAST_MSG) && bound->mp_send_in_progress){ //means a real mp_msg
 +		printf("PSA_ZMQ_TP: Multipart send already in progress. Cannot process a new one.\n");
 +		celixThreadMutex_unlock(&(bound->mp_lock));
 +		celixThreadMutex_unlock(&(bound->parent->tp_lock));
 +		return -3;
 +	}
 +
 +	pubsub_msg_serializer_t* msgSer = (pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, (void*)(uintptr_t)msgTypeId);
 +
 +	if (msgSer!= NULL) {
 +		int major=0, minor=0;
 +
 +		pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
 +		strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
 +		msg_hdr->type = msgTypeId;
 +
 +		if (msgSer->msgVersion != NULL){
 +			version_getMajor(msgSer->msgVersion, &major);
 +			version_getMinor(msgSer->msgVersion, &minor);
 +			msg_hdr->major = major;
 +			msg_hdr->minor = minor;
 +		}
 +
 +		void *serializedOutput = NULL;
 +		size_t serializedOutputLen = 0;
 +		msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen);
 +
 +		pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg));
 +		msg->header = msg_hdr;
 +		msg->payload = (char*)serializedOutput;
 +		msg->payloadSize = serializedOutputLen;
 +		bool snd = true;
 +
 +		switch(flags){
 +		case PUBSUB_PUBLISHER_FIRST_MSG:
 +			bound->mp_send_in_progress = true;
 +			arrayList_add(bound->mp_parts,msg);
 +			break;
 +		case PUBSUB_PUBLISHER_PART_MSG:
 +			if(!bound->mp_send_in_progress){
 +				printf("PSA_ZMQ_TP: ERROR: received msg part without the first part.\n");
 +				status = -4;
 +			}
 +			else{
 +				arrayList_add(bound->mp_parts,msg);
 +			}
 +			break;
 +		case PUBSUB_PUBLISHER_LAST_MSG:
 +			if(!bound->mp_send_in_progress){
 +				printf("PSA_ZMQ_TP: ERROR: received end msg without the first part.\n");
 +				status = -4;
 +			}
 +			else{
 +				arrayList_add(bound->mp_parts,msg);
 +				snd = send_pubsub_mp_msg(bound->parent->zmq_socket,bound->mp_parts);
 +				bound->mp_send_in_progress = false;
 +			}
 +			break;
 +		case PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG:	//Normal send case
 +			snd = send_pubsub_msg(bound->parent->zmq_socket,msg,true);
 +			break;
 +		default:
 +			printf("PSA_ZMQ_TP: ERROR: Invalid MP flags combination\n");
 +			status = -4;
 +			break;
 +		}
 +
 +		if(status==-4){
 +			free(msg);
 +		}
 +
 +		if(!snd){
 +			printf("PSA_ZMQ_TP: Failed to send %s message %u.\n",flags == (PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG) ? "single" : "multipart", msgTypeId);
 +		}
 +
 +	} else {
 +        printf("PSA_ZMQ_TP: No msg serializer available for msg type id %d\n", msgTypeId);
 +		status=-1;
 +	}
 +
 +	celixThreadMutex_unlock(&(bound->mp_lock));
 +	celixThreadMutex_unlock(&(bound->parent->tp_lock));
 +
 +	return status;
 +
 +}
 +
 +static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){
 +	*msgTypeId = utils_stringHash(msgType);
 +	return 0;
 +}
 +
 +
 +static unsigned int rand_range(unsigned int min, unsigned int max){
 +
 +	double scaled = (double)(((double)random())/((double)RAND_MAX));
 +	return (max-min+1)*scaled + min;
 +
 +}
 +
 +static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){
 +
 +	//PRECOND lock on tp->lock
 +
 +	publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
 +
 +	if (bound != NULL) {
 +
 +		bound->parent = tp;
 +		bound->bundle = bundle;
 +		bound->getCount = 1;
 +		bound->mp_send_in_progress = false;
 +		celixThreadMutex_create(&bound->mp_lock,NULL);
 +
 +		if(tp->serializer != NULL){
 +			tp->serializer->createSerializerMap(tp->serializer->handle,bundle,&bound->msgTypes);
 +		}
 +
 +		arrayList_create(&bound->mp_parts);
 +
 +		pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
- 		bound->topic=strdup(pubEP->topic);
++		bound->topic=strdup(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +
 +		bound->service.handle = bound;
 +		bound->service.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID;
 +		bound->service.send = pubsub_topicPublicationSend;
 +		bound->service.sendMultipart = pubsub_topicPublicationSendMultipart;
 +
 +	}
 +
 +	return bound;
 +}
 +
 +static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){
 +
 +	//PRECOND lock on tp->lock
 +
 +	celixThreadMutex_lock(&boundSvc->mp_lock);
 +
 +
 +	if(boundSvc->parent->serializer != NULL && boundSvc->msgTypes != NULL){
 +		boundSvc->parent->serializer->destroySerializerMap(boundSvc->parent->serializer->handle, boundSvc->msgTypes);
 +	}
 +
 +	if(boundSvc->mp_parts!=NULL){
 +		arrayList_destroy(boundSvc->mp_parts);
 +	}
 +
 +	if(boundSvc->topic!=NULL){
 +		free(boundSvc->topic);
 +	}
 +
 +	celixThreadMutex_unlock(&boundSvc->mp_lock);
 +	celixThreadMutex_destroy(&boundSvc->mp_lock);
 +
 +	free(boundSvc);
 +
 +}
 +
 +static void delay_first_send_for_late_joiners(){
 +
 +	static bool firstSend = true;
 +
 +	if(firstSend){
 +		printf("PSA_ZMQ_TP: Delaying first send for late joiners...\n");
 +		sleep(FIRST_SEND_DELAY);
 +		firstSend = false;
 +	}
 +}

http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_discovery/src/etcd_watcher.c
----------------------------------------------------------------------
diff --cc pubsub/pubsub_discovery/src/etcd_watcher.c
index 3c3a5a8,0000000..726269a
mode 100644,000000..100644
--- a/pubsub/pubsub_discovery/src/etcd_watcher.c
+++ b/pubsub/pubsub_discovery/src/etcd_watcher.c
@@@ -1,290 -1,0 +1,344 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +
 +#include <stdbool.h>
 +#include <stdlib.h>
 +#include <unistd.h>
 +#include <string.h>
++#include <jansson.h>
 +
 +#include "celix_log.h"
 +#include "constants.h"
 +
 +#include "etcd.h"
 +#include "etcd_watcher.h"
 +
 +#include "pubsub_discovery.h"
 +#include "pubsub_discovery_impl.h"
 +
 +
 +
 +#define MAX_ROOTNODE_LENGTH             128
 +#define MAX_LOCALNODE_LENGTH            4096
 +#define MAX_FIELD_LENGTH                128
 +
 +#define CFG_ETCD_ROOT_PATH              "PUBSUB_DISCOVERY_ETCD_ROOT_PATH"
 +#define DEFAULT_ETCD_ROOTPATH           "pubsub/discovery"
 +
 +#define CFG_ETCD_SERVER_IP              "PUBSUB_DISCOVERY_ETCD_SERVER_IP"
 +#define DEFAULT_ETCD_SERVER_IP          "127.0.0.1"
 +
 +#define CFG_ETCD_SERVER_PORT            "PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
 +#define DEFAULT_ETCD_SERVER_PORT        2379
 +
 +// be careful - this should be higher than the curl timeout
 +#define CFG_ETCD_TTL                    "DISCOVERY_ETCD_TTL"
 +#define DEFAULT_ETCD_TTL                30
 +
 +
 +struct etcd_watcher {
 +	pubsub_discovery_pt pubsub_discovery;
 +
 +	celix_thread_mutex_t watcherLock;
 +	celix_thread_t watcherThread;
 +
 +	char *scope;
 +	char *topic;
 +	volatile bool running;
 +};
 +
 +struct etcd_writer {
 +	pubsub_discovery_pt pubsub_discovery;
 +	celix_thread_mutex_t localPubsLock;
 +	array_list_pt localPubs;
 +	volatile bool running;
 +	celix_thread_t writerThread;
 +};
 +
 +
 +// note that the rootNode shouldn't have a leading slash
 +static celix_status_t etcdWatcher_getTopicRootPath(bundle_context_pt context, const char *scope, const char *topic, char* rootNode, int rootNodeLen) {
 +	celix_status_t status = CELIX_SUCCESS;
 +	const char* rootPath = NULL;
 +
 +	if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) {
 +		snprintf(rootNode, rootNodeLen, "%s/%s/%s", DEFAULT_ETCD_ROOTPATH, scope, topic);
 +	} else {
 +		snprintf(rootNode, rootNodeLen, "%s/%s/%s", rootPath, scope, topic);
 +	}
 +
 +	return status;
 +}
 +
 +static celix_status_t etcdWatcher_getRootPath(bundle_context_pt context, char* rootNode) {
 +	celix_status_t status = CELIX_SUCCESS;
 +	const char* rootPath = NULL;
 +
 +	if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) {
 +		strncpy(rootNode, DEFAULT_ETCD_ROOTPATH, MAX_ROOTNODE_LENGTH);
 +	} else {
 +		strncpy(rootNode, rootPath, MAX_ROOTNODE_LENGTH);
 +	}
 +
 +	return status;
 +}
 +
 +
 +static void add_node(const char *key, const char *value, void* arg) {
 +	pubsub_discovery_pt ps_discovery = (pubsub_discovery_pt) arg;
 +	pubsub_endpoint_pt pubEP = NULL;
 +	celix_status_t status = etcdWatcher_getPublisherEndpointFromKey(ps_discovery, key, value, &pubEP);
 +	if(!status && pubEP) {
 +		pubsub_discovery_addNode(ps_discovery, pubEP);
 +	}
 +}
 +
 +static celix_status_t etcdWatcher_addAlreadyExistingPublishers(pubsub_discovery_pt ps_discovery, const char *rootPath, long long * highestModified) {
 +	celix_status_t status = CELIX_SUCCESS;
 +	if(etcd_get_directory(rootPath, add_node, ps_discovery, highestModified)) {
 +		status = CELIX_ILLEGAL_ARGUMENT;
 +	}
 +	return status;
 +}
 +
 +// gets everything from provided key
 +celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt pubsub_discovery, const char* etcdKey, const char* etcdValue, pubsub_endpoint_pt* pubEP) {
 +
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	char rootPath[MAX_ROOTNODE_LENGTH];
 +	char *expr = NULL;
 +	char scope[MAX_FIELD_LENGTH];
 +	char topic[MAX_FIELD_LENGTH];
 +	char fwUUID[MAX_FIELD_LENGTH];
 +	char serviceId[MAX_FIELD_LENGTH];
 +
 +	memset(rootPath,0,MAX_ROOTNODE_LENGTH);
 +	memset(topic,0,MAX_FIELD_LENGTH);
 +	memset(fwUUID,0,MAX_FIELD_LENGTH);
 +	memset(serviceId,0,MAX_FIELD_LENGTH);
 +
 +	etcdWatcher_getRootPath(pubsub_discovery->context, rootPath);
 +
 +	asprintf(&expr, "/%s/%%[^/]/%%[^/]/%%[^/]/%%[^/].*", rootPath);
 +	if(expr) {
 +		int foundItems = sscanf(etcdKey, expr, scope, topic, fwUUID, serviceId);
 +		free(expr);
 +		if (foundItems != 4) { // Could happen when a directory is removed, just don't process this.
 +			status = CELIX_ILLEGAL_STATE;
 +		}
 +		else{
- 			status = pubsubEndpoint_create(fwUUID,scope,topic,strtol(serviceId,NULL,10),etcdValue,NULL,pubEP);
++
++			// etcdValue contains the json formatted string
++			json_error_t error;
++			json_t* jsonRoot = json_loads(etcdValue, JSON_DECODE_ANY, &error);
++
++			const char* endpoint_serializer = NULL;
++			const char* endpoint_admin_type = NULL;
++			const char* endpoint_url = NULL;
++			const char* endpoint_type = NULL;
++
++			if (json_is_object(jsonRoot)){
++
++				void *iter = json_object_iter(jsonRoot);
++
++				const char *key;
++				json_t *value;
++
++				while (iter) {
++					key = json_object_iter_key(iter);
++					value = json_object_iter_value(iter);
++
++					if (strcmp(key, PUBSUB_ENDPOINT_SERIALIZER) == 0) {
++						endpoint_serializer = json_string_value(value);
++					} else if (strcmp(key, PUBSUB_ENDPOINT_ADMIN_TYPE) == 0) {
++						endpoint_admin_type = json_string_value(value);
++					} else if (strcmp(key, PUBSUB_ENDPOINT_URL) == 0) {
++						endpoint_url = json_string_value(value);
++					} else if (strcmp(key, PUBSUB_ENDPOINT_TYPE) == 0) {
++						endpoint_type = json_string_value(value);
++					}
++
++					iter = json_object_iter_next(jsonRoot, iter);
++				}
++
++				if (endpoint_url == NULL) {
++					printf("EW: No endpoint found in json object!\n");
++					endpoint_url = etcdValue;
++				}
++
++			} else {
++				endpoint_url = etcdValue;
++			}
++
++			status = pubsubEndpoint_create(fwUUID,scope,topic,strtol(serviceId,NULL,10),endpoint_url,NULL,pubEP);
++
++            if (status == CELIX_SUCCESS) {
++                status += pubsubEndpoint_setField(*pubEP, PUBSUB_ENDPOINT_SERIALIZER, endpoint_serializer);
++                status += pubsubEndpoint_setField(*pubEP, PUBSUB_ENDPOINT_ADMIN_TYPE, endpoint_admin_type);
++                status += pubsubEndpoint_setField(*pubEP, PUBSUB_ENDPOINT_TYPE, endpoint_type);
++            }
++
++			if (jsonRoot != NULL) {
++				json_decref(jsonRoot);
++			}
 +		}
 +	}
 +	return status;
 +}
 +
 +/*
 + * performs (blocking) etcd_watch calls to check for
 + * changing discovery endpoint information within etcd.
 + */
 +static void* etcdWatcher_run(void* data) {
 +	etcd_watcher_pt watcher = (etcd_watcher_pt) data;
 +	time_t timeBeforeWatch = time(NULL);
 +	char rootPath[MAX_ROOTNODE_LENGTH];
 +	long long highestModified = 0;
 +
 +	pubsub_discovery_pt ps_discovery = watcher->pubsub_discovery;
 +	bundle_context_pt context = ps_discovery->context;
 +
 +	memset(rootPath, 0, MAX_ROOTNODE_LENGTH);
 +
 +	//TODO: add topic to etcd key
 +	etcdWatcher_getTopicRootPath(context, watcher->scope, watcher->topic, rootPath, MAX_ROOTNODE_LENGTH);
 +	etcdWatcher_addAlreadyExistingPublishers(ps_discovery, rootPath, &highestModified);
 +
 +	while ((celixThreadMutex_lock(&watcher->watcherLock) == CELIX_SUCCESS) && watcher->running) {
 +
 +		char *rkey = NULL;
 +		char *value = NULL;
 +		char *preValue = NULL;
 +		char *action = NULL;
 +		long long modIndex;
 +
 +		celixThreadMutex_unlock(&watcher->watcherLock);
 +
 +		if (etcd_watch(rootPath, highestModified + 1, &action, &preValue, &value, &rkey, &modIndex) == 0 && action != NULL) {
 +			pubsub_endpoint_pt pubEP = NULL;
 +			if ((strcmp(action, "set") == 0) || (strcmp(action, "create") == 0)) {
 +				if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == CELIX_SUCCESS) {
 +					pubsub_discovery_addNode(ps_discovery, pubEP);
 +				}
 +			} else if (strcmp(action, "delete") == 0) {
 +				if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) == CELIX_SUCCESS) {
 +					pubsub_discovery_removeNode(ps_discovery, pubEP);
 +				}
 +			} else if (strcmp(action, "expire") == 0) {
 +				if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) == CELIX_SUCCESS) {
 +					pubsub_discovery_removeNode(ps_discovery, pubEP);
 +				}
 +			} else if (strcmp(action, "update") == 0) {
 +				if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == CELIX_SUCCESS) {
 +					pubsub_discovery_addNode(ps_discovery, pubEP);
 +				}
 +			} else {
 +				fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Unexpected action: %s", action);
 +			}
 +			highestModified = modIndex;
 +		} else if (time(NULL) - timeBeforeWatch <= (DEFAULT_ETCD_TTL / 4)) {
 +			sleep(DEFAULT_ETCD_TTL / 4);
 +		}
 +
 +		FREE_MEM(action);
 +		FREE_MEM(value);
 +		FREE_MEM(preValue);
 +		FREE_MEM(rkey);
 +
 +		/* prevent busy waiting, in case etcd_watch returns false */
 +
 +
 +		if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL / 4)) {
 +			timeBeforeWatch = time(NULL);
 +		}
 +
 +	}
 +
 +	if (watcher->running == false) {
 +		celixThreadMutex_unlock(&watcher->watcherLock);
 +	}
 +
 +	return NULL;
 +}
 +
 +celix_status_t etcdWatcher_create(pubsub_discovery_pt pubsub_discovery, bundle_context_pt context, const char *scope, const char *topic, etcd_watcher_pt *watcher) {
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +
 +	if (pubsub_discovery == NULL) {
 +		return CELIX_BUNDLE_EXCEPTION;
 +	}
 +
 +	(*watcher) = calloc(1, sizeof(struct etcd_watcher));
 +
 +	if(*watcher == NULL){
 +		return CELIX_ENOMEM;
 +	}
 +
 +	(*watcher)->pubsub_discovery = pubsub_discovery;
 +	(*watcher)->scope = strdup(scope);
 +	(*watcher)->topic = strdup(topic);
 +
 +
 +	celixThreadMutex_create(&(*watcher)->watcherLock, NULL);
 +
 +	celixThreadMutex_lock(&(*watcher)->watcherLock);
 +
 +	status = celixThread_create(&(*watcher)->watcherThread, NULL, etcdWatcher_run, *watcher);
 +	if (status == CELIX_SUCCESS) {
 +		(*watcher)->running = true;
 +	}
 +
 +	celixThreadMutex_unlock(&(*watcher)->watcherLock);
 +
 +
 +	return status;
 +}
 +
 +celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher) {
 +
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	char rootPath[MAX_ROOTNODE_LENGTH];
 +	etcdWatcher_getTopicRootPath(watcher->pubsub_discovery->context, watcher->scope, watcher->topic, rootPath, MAX_ROOTNODE_LENGTH);
 +	celixThreadMutex_destroy(&(watcher->watcherLock));
 +
 +	free(watcher->scope);
 +	free(watcher->topic);
 +	free(watcher);
 +
 +	return status;
 +}
 +
 +celix_status_t etcdWatcher_stop(etcd_watcher_pt watcher){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	celixThreadMutex_lock(&(watcher->watcherLock));
 +	watcher->running = false;
 +	celixThreadMutex_unlock(&(watcher->watcherLock));
 +
 +	celixThread_join(watcher->watcherThread, NULL);
 +
 +	return status;
 +
 +}
 +
 +

http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_discovery/src/etcd_writer.c
----------------------------------------------------------------------
diff --cc pubsub/pubsub_discovery/src/etcd_writer.c
index 1c423f3,0000000..e820e50
mode 100644,000000..100644
--- a/pubsub/pubsub_discovery/src/etcd_writer.c
+++ b/pubsub/pubsub_discovery/src/etcd_writer.c
@@@ -1,189 -1,0 +1,221 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +
 +#include <stdbool.h>
 +#include <stdlib.h>
 +#include <unistd.h>
 +#include <string.h>
++#include <jansson.h>
 +
 +#include "celix_log.h"
 +#include "constants.h"
 +
 +#include "etcd.h"
 +#include "etcd_writer.h"
 +
 +#include "pubsub_discovery.h"
 +#include "pubsub_discovery_impl.h"
 +
 +#define MAX_ROOTNODE_LENGTH		128
 +
 +#define CFG_ETCD_ROOT_PATH		"PUBSUB_DISCOVERY_ETCD_ROOT_PATH"
 +#define DEFAULT_ETCD_ROOTPATH	"pubsub/discovery"
 +
 +#define CFG_ETCD_SERVER_IP		"PUBSUB_DISCOVERY_ETCD_SERVER_IP"
 +#define DEFAULT_ETCD_SERVER_IP	"127.0.0.1"
 +
 +#define CFG_ETCD_SERVER_PORT	"PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
 +#define DEFAULT_ETCD_SERVER_PORT 2379
 +
 +// be careful - this should be higher than the curl timeout
 +#define CFG_ETCD_TTL   "DISCOVERY_ETCD_TTL"
 +#define DEFAULT_ETCD_TTL 30
 +
 +struct etcd_writer {
 +	pubsub_discovery_pt pubsub_discovery;
 +	celix_thread_mutex_t localPubsLock;
 +	array_list_pt localPubs;
 +	volatile bool running;
 +	celix_thread_t writerThread;
 +};
 +
 +
 +static const char* etcdWriter_getRootPath(bundle_context_pt context);
 +static void* etcdWriter_run(void* data);
 +
 +
 +etcd_writer_pt etcdWriter_create(pubsub_discovery_pt disc) {
 +	etcd_writer_pt writer = calloc(1, sizeof(*writer));
 +	if(writer) {
 +		celixThreadMutex_create(&writer->localPubsLock, NULL);
 +		arrayList_create(&writer->localPubs);
 +		writer->pubsub_discovery = disc;
 +		writer->running = true;
 +		celixThread_create(&writer->writerThread, NULL, etcdWriter_run, writer);
 +	}
 +	return writer;
 +}
 +
 +void etcdWriter_destroy(etcd_writer_pt writer) {
 +	char dir[MAX_ROOTNODE_LENGTH];
 +	const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context);
 +
 +	writer->running = false;
 +	celixThread_join(writer->writerThread, NULL);
 +
 +	celixThreadMutex_lock(&writer->localPubsLock);
 +	for(int i = 0; i < arrayList_size(writer->localPubs); i++) {
 +		pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(writer->localPubs,i);
 +		memset(dir,0,MAX_ROOTNODE_LENGTH);
- 		snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID);
++		snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s",
++				 rootPath,
++				 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
++				 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
++				 properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID));
++
 +		etcd_del(dir);
 +		pubsubEndpoint_destroy(pubEP);
 +	}
 +	arrayList_destroy(writer->localPubs);
 +
 +	celixThreadMutex_unlock(&writer->localPubsLock);
 +	celixThreadMutex_destroy(&(writer->localPubsLock));
 +
 +	free(writer);
 +}
 +
 +celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP, bool storeEP){
 +	celix_status_t status = CELIX_BUNDLE_EXCEPTION;
 +
 +	if(storeEP){
 +		const char *fwUUID = NULL;
 +		bundleContext_getProperty(writer->pubsub_discovery->context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
- 		if(fwUUID && strcmp(pubEP->frameworkUUID, fwUUID) == 0) {
++		if(fwUUID && strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), fwUUID) == 0) {
 +			celixThreadMutex_lock(&writer->localPubsLock);
 +			pubsub_endpoint_pt p = NULL;
 +			pubsubEndpoint_clone(pubEP, &p);
 +			arrayList_add(writer->localPubs,p);
 +			celixThreadMutex_unlock(&writer->localPubsLock);
 +		}
 +	}
 +
 +	char *key;
 +
 +	const char* ttlStr = NULL;
 +	int ttl = 0;
 +
 +	// determine ttl
 +	if ((bundleContext_getProperty(writer->pubsub_discovery->context, CFG_ETCD_TTL, &ttlStr) != CELIX_SUCCESS) || !ttlStr) {
 +		ttl = DEFAULT_ETCD_TTL;
 +	} else {
 +		char* endptr = NULL;
 +		errno = 0;
 +		ttl = strtol(ttlStr, &endptr, 10);
 +		if (*endptr || errno != 0) {
 +			ttl = DEFAULT_ETCD_TTL;
 +		}
 +	}
 +
 +	const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context);
 +
- 	asprintf(&key,"%s/%s/%s/%s/%ld",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID,pubEP->serviceID);
- 
- 	if(!etcd_set(key,pubEP->endpoint,ttl,false)){
++	asprintf(&key,"%s/%s/%s/%s/%ld",
++			 rootPath,
++			 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
++			 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
++			 properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
++			 pubEP->serviceID);
++
++    char serviceID [sizeof(pubEP->serviceID)];
++    snprintf(serviceID, sizeof(pubEP->serviceID), "%ld", pubEP->serviceID);
++	json_t* jsonEndpoint = json_pack("{s:s,s:s,s:s,s:s,s:s,s:s,s:s}",
++									 PUBSUB_ENDPOINT_SERVICE_ID, serviceID,
++									 PUBSUB_ENDPOINT_SERIALIZER, "serializer.json", //TODO: Serializer not (yet) stored in endpoint
++									 PUBSUB_ENDPOINT_ADMIN_TYPE, "zmq", //TODO: PSA type not (yet) stored in endpoint
++									 PUBSUB_ENDPOINT_URL, properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL),
++									 PUBSUB_ENDPOINT_TYPE, "publisher", //TODO: Check if necessary
++									 PUBSUB_ENDPOINT_TOPIC, properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
++									 PUBSUB_ENDPOINT_SCOPE, properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE)
++	);
++	char* jsonEndpointStr = json_dumps(jsonEndpoint, JSON_COMPACT);
++
++	if (!etcd_set(key,jsonEndpointStr,ttl,false)) {
 +		status = CELIX_ILLEGAL_ARGUMENT;
 +	}
 +	FREE_MEM(key);
++	FREE_MEM(jsonEndpointStr);
++	json_decref(jsonEndpoint);
++
 +	return status;
 +}
 +
 +celix_status_t etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP) {
 +	celix_status_t status = CELIX_SUCCESS;
 +	char *key = NULL;
 +
 +	const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context);
 +
- 	asprintf(&key, "%s/%s/%s/%s/%ld", rootPath, pubEP->scope, pubEP->topic, pubEP->frameworkUUID, pubEP->serviceID);
++	asprintf(&key, "%s/%s/%s/%s/%ld",
++			 rootPath,
++			 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
++			 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
++			 properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
++			 pubEP->serviceID);
 +
 +	celixThreadMutex_lock(&writer->localPubsLock);
 +	for (unsigned int i = 0; i < arrayList_size(writer->localPubs); i++) {
 +		pubsub_endpoint_pt ep = arrayList_get(writer->localPubs, i);
 +		if (pubsubEndpoint_equals(ep, pubEP)) {
 +			arrayList_remove(writer->localPubs, i);
 +			pubsubEndpoint_destroy(ep);
 +			break;
 +		}
 +	}
 +	celixThreadMutex_unlock(&writer->localPubsLock);
 +
 +	if (etcd_del(key)) {
 +		printf("Failed to remove key %s from ETCD\n",key);
 +		status = CELIX_ILLEGAL_ARGUMENT;
 +	}
 +	FREE_MEM(key);
 +	return status;
 +}
 +
 +static void* etcdWriter_run(void* data) {
 +	etcd_writer_pt writer = (etcd_writer_pt)data;
 +	while(writer->running) {
 +		celixThreadMutex_lock(&writer->localPubsLock);
 +		for(int i=0; i < arrayList_size(writer->localPubs); i++) {
 +			etcdWriter_addPublisherEndpoint(writer,(pubsub_endpoint_pt)arrayList_get(writer->localPubs,i),false);
 +		}
 +		celixThreadMutex_unlock(&writer->localPubsLock);
 +		sleep(DEFAULT_ETCD_TTL / 2);
 +	}
 +
 +	return NULL;
 +}
 +
 +static const char* etcdWriter_getRootPath(bundle_context_pt context) {
 +	const char* rootPath = NULL;
 +	bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath);
 +	if(rootPath == NULL) {
 +		rootPath = DEFAULT_ETCD_ROOTPATH;
 +	}
 +	return rootPath;
 +}
 +


Mime
View raw message