celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rlenfer...@apache.org
Subject [4/4] celix git commit: Merge branch 'endpoint-format' into develop
Date Fri, 02 Feb 2018 13:38:11 GMT
Merge branch 'endpoint-format' into develop


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/0a5ef69a
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/0a5ef69a
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/0a5ef69a

Branch: refs/heads/develop
Commit: 0a5ef69a6a6a2167953c4ad4fa807142d46ad768
Parents: a8b8410 9094f55
Author: Roy Lenferink <rlenferink@apache.org>
Authored: Fri Feb 2 14:01:36 2018 +0100
Committer: Roy Lenferink <rlenferink@apache.org>
Committed: Fri Feb 2 14:01:36 2018 +0100

----------------------------------------------------------------------
 .../pubsub_admin_udp_mc/src/pubsub_admin_impl.c | 103 ++++++++++++-------
 .../pubsub_admin_udp_mc/src/topic_publication.c |  15 +--
 pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c |  99 +++++++++++-------
 pubsub/pubsub_admin_zmq/src/topic_publication.c |  13 +--
 pubsub/pubsub_discovery/src/etcd_watcher.c      |  56 +++++++++-
 pubsub/pubsub_discovery/src/etcd_writer.c       |  44 ++++++--
 .../src/pubsub_discovery_impl.c                 |  19 ++--
 pubsub/pubsub_spi/include/pubsub_endpoint.h     |  19 ++--
 pubsub/pubsub_spi/src/pubsub_endpoint.c         |  88 ++++++++++------
 .../src/pubsub_topology_manager.c               |  40 ++++---
 utils/include/properties.h                      |   2 +
 utils/src/properties.c                          |   5 +
 12 files changed, 343 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --cc pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
index 674f817,0000000..a71344a
mode 100644,000000..100644
--- a/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
@@@ -1,1039 -1,0 +1,1062 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +/*
 + * pubsub_admin_impl.c
 + *
 + *  \date       Sep 30, 2011
 + *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
 + *  \copyright	Apache License, Version 2.0
 + */
 +
 +#include <stdio.h>
 +#include <stdlib.h>
 +
 +#ifndef ANDROID
 +#include <ifaddrs.h>
 +#endif
 +
 +#include <stdio.h>
 +#include <stdlib.h>
 +#include <unistd.h>
 +#include <string.h>
 +
 +#include <sys/types.h>
 +#include <sys/socket.h>
 +#include <netinet/in.h>
 +#include <arpa/inet.h>
 +#include <netdb.h>
 +
 +#include "constants.h"
 +#include "utils.h"
 +#include "hash_map.h"
 +#include "array_list.h"
 +#include "bundle_context.h"
 +#include "bundle.h"
 +#include "service_reference.h"
 +#include "service_registration.h"
 +#include "log_helper.h"
 +#include "log_service.h"
 +#include "celix_threads.h"
 +#include "service_factory.h"
 +
 +#include "pubsub_admin_impl.h"
 +#include "topic_subscription.h"
 +#include "topic_publication.h"
 +#include "pubsub_endpoint.h"
 +#include "pubsub/subscriber.h"
 +#include "pubsub_admin_match.h"
 +
 +static const char *DEFAULT_MC_IP = "224.100.1.1";
 +static char *DEFAULT_MC_PREFIX = "224.100";
 +
 +static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip);
 +static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
 +static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
 +
 +static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc);
 +static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication);
 +static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication);
 +
 +celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin) {
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	*admin = calloc(1, sizeof(**admin));
 +
 +	if (!*admin) {
 +		return CELIX_ENOMEM;
 +	}
 +
 +	char *mc_ip = NULL;
 +	char *if_ip = NULL;
 +	int sendSocket = -1;
 +
 +	if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) {
 +		logHelper_start((*admin)->loghelper);
 +	}
 +	const char *mc_ip_prop = NULL;
 +	bundleContext_getProperty(context,PSA_IP , &mc_ip_prop);
 +	if(mc_ip_prop) {
 +		mc_ip = strdup(mc_ip_prop);
 +	}
 +
 +#ifndef ANDROID
 +	if (mc_ip == NULL) {
 +		const char *mc_prefix = NULL;
 +		const char *interface = NULL;
 +		int b0 = 0, b1 = 0, b2 = 0, b3 = 0;
 +		bundleContext_getProperty(context,PSA_MULTICAST_IP_PREFIX , &mc_prefix);
 +		if(mc_prefix == NULL) {
 +			mc_prefix = DEFAULT_MC_PREFIX;
 +		}
 +
 +		bundleContext_getProperty(context, PSA_ITF, &interface);
 +		if (pubsubAdmin_getIpAddress(interface, &if_ip) != CELIX_SUCCESS) {
 +			logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: Could not retrieve IP address for interface %s", interface);
 +		}
 +
 +		printf("IP Detected : %s\n", if_ip);
 +		if(if_ip && sscanf(if_ip, "%i.%i.%i.%i", &b0, &b1, &b2, &b3) != 4) {
 +			logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: Could not parse IP address %s", if_ip);
 +			b2 = 1;
 +			b3 = 1;
 +		}
 +
 +		asprintf(&mc_ip, "%s.%d.%d",mc_prefix, b2, b3);
 +
 +		sendSocket = socket(AF_INET, SOCK_DGRAM, 0);
 +		if(sendSocket == -1) {
 +			perror("pubsubAdmin_create:socket");
 +			status = CELIX_SERVICE_EXCEPTION;
 +		}
 +
 +		if(status == CELIX_SUCCESS){
 +			char loop = 1;
 +			if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)) != 0) {
 +				perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_LOOP)");
 +				status = CELIX_SERVICE_EXCEPTION;
 +			}
 +		}
 +
 +		if(status == CELIX_SUCCESS){
 +			struct in_addr multicast_interface;
 +			inet_aton(if_ip, &multicast_interface);
 +			if(setsockopt(sendSocket,  IPPROTO_IP, IP_MULTICAST_IF, &multicast_interface, sizeof(multicast_interface)) != 0) {
 +				perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_IF)");
 +				status = CELIX_SERVICE_EXCEPTION;
 +			}
 +		}
 +
 +	}
 +
 +
 +	if(status != CELIX_SUCCESS){
 +		logHelper_stop((*admin)->loghelper);
 +		logHelper_destroy(&((*admin)->loghelper));
 +		if(sendSocket >=0){
 +			close(sendSocket);
 +		}
 +		if(if_ip != NULL){
 +			free(if_ip);
 +		}
 +		if(mc_ip != NULL){
 +			free(mc_ip);
 +		}
 +		return status;
 +	}
 +	else{
 +		(*admin)->sendSocket = sendSocket;
 +	}
 +
 +#endif
 +
 +	(*admin)->bundle_context= context;
 +	(*admin)->localPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 +	(*admin)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 +	(*admin)->pendingSubscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 +	(*admin)->externalPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 +	(*admin)->topicSubscriptionsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL);
 +	(*admin)->topicPublicationsPerSerializer  = hashMap_create(NULL, NULL, NULL, NULL);
 +	arrayList_create(&((*admin)->noSerializerSubscriptions));
 +	arrayList_create(&((*admin)->noSerializerPublications));
 +	arrayList_create(&((*admin)->serializerList));
 +
 +	celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL);
 +	celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL);
 +	celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL);
 +	celixThreadMutex_create(&(*admin)->serializerListLock, NULL);
 +	celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL);
 +
 +	celixThreadMutexAttr_create(&(*admin)->noSerializerPendingsAttr);
 +	celixThreadMutexAttr_settype(&(*admin)->noSerializerPendingsAttr, CELIX_THREAD_MUTEX_RECURSIVE);
 +	celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, &(*admin)->noSerializerPendingsAttr);
 +
 +	celixThreadMutexAttr_create(&(*admin)->pendingSubscriptionsAttr);
 +	celixThreadMutexAttr_settype(&(*admin)->pendingSubscriptionsAttr, CELIX_THREAD_MUTEX_RECURSIVE);
 +	celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, &(*admin)->pendingSubscriptionsAttr);
 +
 +	if (if_ip != NULL) {
 +		logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_UDP_MC: Using %s as interface for multicast communication", if_ip);
 +		(*admin)->ifIpAddress = if_ip;
 +	} else {
 +		(*admin)->ifIpAddress = strdup("127.0.0.1");
 +	}
 +
 +	if (mc_ip != NULL) {
 +		logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_UDP_MC: Using %s for service annunciation", mc_ip);
 +		(*admin)->mcIpAddress = mc_ip;
 +	}
 +	else {
 +		logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: No IP address for service annunciation set. Using %s", DEFAULT_MC_IP);
 +		(*admin)->mcIpAddress = strdup(DEFAULT_MC_IP);
 +	}
 +
 +	return status;
 +}
 +
 +
 +celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
 +{
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	free(admin->mcIpAddress);
 +	free(admin->ifIpAddress);
 +
 +	celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
 +	hash_map_iterator_pt iter = hashMapIterator_create(admin->pendingSubscriptions);
 +	while(hashMapIterator_hasNext(iter)){
 +		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
 +		free((char*)hashMapEntry_getKey(entry));
 +		arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
 +	}
 +	hashMapIterator_destroy(iter);
 +	hashMap_destroy(admin->pendingSubscriptions,false,false);
 +	celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
 +
 +	celixThreadMutex_lock(&admin->subscriptionsLock);
 +	hashMap_destroy(admin->subscriptions,false,false);
 +	celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +	celixThreadMutex_lock(&admin->localPublicationsLock);
 +	hashMap_destroy(admin->localPublications,true,false);
 +	celixThreadMutex_unlock(&admin->localPublicationsLock);
 +
 +	celixThreadMutex_lock(&admin->externalPublicationsLock);
 +	iter = hashMapIterator_create(admin->externalPublications);
 +	while(hashMapIterator_hasNext(iter)){
 +		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
 +		free((char*)hashMapEntry_getKey(entry));
 +		arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
 +	}
 +	hashMapIterator_destroy(iter);
 +	hashMap_destroy(admin->externalPublications,false,false);
 +	celixThreadMutex_unlock(&admin->externalPublicationsLock);
 +
 +	celixThreadMutex_lock(&admin->serializerListLock);
 +	arrayList_destroy(admin->serializerList);
 +	celixThreadMutex_unlock(&admin->serializerListLock);
 +
 +	celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +	arrayList_destroy(admin->noSerializerSubscriptions);
 +	arrayList_destroy(admin->noSerializerPublications);
 +	celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +
 +	celixThreadMutex_lock(&admin->usedSerializersLock);
 +
 +	iter = hashMapIterator_create(admin->topicSubscriptionsPerSerializer);
 +	while(hashMapIterator_hasNext(iter)){
 +		arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
 +	}
 +	hashMapIterator_destroy(iter);
 +	hashMap_destroy(admin->topicSubscriptionsPerSerializer,false,false);
 +
 +	iter = hashMapIterator_create(admin->topicPublicationsPerSerializer);
 +	while(hashMapIterator_hasNext(iter)){
 +		arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
 +	}
 +	hashMapIterator_destroy(iter);
 +	hashMap_destroy(admin->topicPublicationsPerSerializer,false,false);
 +
 +	celixThreadMutex_unlock(&admin->usedSerializersLock);
 +
 +	celixThreadMutex_destroy(&admin->usedSerializersLock);
 +	celixThreadMutex_destroy(&admin->serializerListLock);
 +
 +	celixThreadMutexAttr_destroy(&admin->noSerializerPendingsAttr);
 +	celixThreadMutex_destroy(&admin->noSerializerPendingsLock);
 +
 +	celixThreadMutex_destroy(&admin->pendingSubscriptionsLock);
 +	celixThreadMutexAttr_destroy(&admin->pendingSubscriptionsAttr);
 +
 +	celixThreadMutex_destroy(&admin->subscriptionsLock);
 +	celixThreadMutex_destroy(&admin->localPublicationsLock);
 +	celixThreadMutex_destroy(&admin->externalPublicationsLock);
 +
 +	logHelper_stop(admin->loghelper);
 +
 +	logHelper_destroy(&admin->loghelper);
 +
 +	free(admin);
 +
 +	return status;
 +}
 +
 +static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	celixThreadMutex_lock(&admin->subscriptionsLock);
 +
 +	topic_subscription_pt any_sub = hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
 +
 +	if(any_sub==NULL){
 +
 +		int i;
 +		pubsub_serializer_service_t *best_serializer = NULL;
 +		if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){
 +			status = pubsub_topicSubscriptionCreate(admin->bundle_context, admin->ifIpAddress, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, &any_sub);
 +		}
 +		else{
- 			printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",subEP->topic);
++			printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
++				   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
++
 +			celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +			arrayList_add(admin->noSerializerSubscriptions,subEP);
 +			celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +		}
 +
 +		if (status == CELIX_SUCCESS){
 +
 +			/* Connect all internal publishers */
 +			celixThreadMutex_lock(&admin->localPublicationsLock);
 +			hash_map_iterator_pt lp_iter =hashMapIterator_create(admin->localPublications);
 +			while(hashMapIterator_hasNext(lp_iter)){
 +				service_factory_pt factory = (service_factory_pt)hashMapIterator_nextValue(lp_iter);
 +				topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle;
 +				array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs);
 +
 +				if(topic_publishers!=NULL){
 +					for(i=0;i<arrayList_size(topic_publishers);i++){
 +						pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
- 						if(pubEP->endpoint !=NULL){
- 							status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
++						if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
++							status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +						}
 +					}
 +					arrayList_destroy(topic_publishers);
 +				}
 +			}
 +			hashMapIterator_destroy(lp_iter);
 +			celixThreadMutex_unlock(&admin->localPublicationsLock);
 +
 +			/* Connect also all external publishers */
 +			celixThreadMutex_lock(&admin->externalPublicationsLock);
 +			hash_map_iterator_pt extp_iter =hashMapIterator_create(admin->externalPublications);
 +			while(hashMapIterator_hasNext(extp_iter)){
 +				array_list_pt ext_pub_list = (array_list_pt)hashMapIterator_nextValue(extp_iter);
 +				if(ext_pub_list!=NULL){
 +					for(i=0;i<arrayList_size(ext_pub_list);i++){
 +						pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
- 						if(pubEP->endpoint !=NULL){
- 							status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
++						if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
++							status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +						}
 +					}
 +				}
 +			}
 +			hashMapIterator_destroy(extp_iter);
 +			celixThreadMutex_unlock(&admin->externalPublicationsLock);
 +
 +
 +			pubsub_topicSubscriptionAddSubscriber(any_sub,subEP);
 +
 +			status += pubsub_topicSubscriptionStart(any_sub);
 +
 +		}
 +
 +		if (status == CELIX_SUCCESS){
 +			hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub);
 +			connectTopicPubSubToSerializer(admin, best_serializer, any_sub, false);
 +		}
 +
 +	}
 +
 +	celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +	return status;
 +}
 +
 +celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
 +	celix_status_t status = CELIX_SUCCESS;
 +
- 	printf("PSA_UDP_MC: Received subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope,subEP->topic);
++	printf("PSA_UDP_MC: Received subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",
++		   properties_get(subEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
++		   subEP->serviceID,
++		   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
++		   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +
- 	if(strcmp(subEP->topic,PUBSUB_ANY_SUB_TOPIC)==0){
++	if(strcmp(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),PUBSUB_ANY_SUB_TOPIC)==0){
 +		return pubsubAdmin_addAnySubscription(admin,subEP);
 +	}
 +
 +	/* Check if we already know some publisher about this topic, otherwise let's put the subscription in the pending hashmap */
 +	celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
 +	celixThreadMutex_lock(&admin->subscriptionsLock);
 +	celixThreadMutex_lock(&admin->localPublicationsLock);
 +	celixThreadMutex_lock(&admin->externalPublicationsLock);
 +
- 	char* scope_topic = createScopeTopicKey(subEP->scope,subEP->topic);
++	char* scope_topic = createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +
 +	service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
 +	array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
 +
 +	if(factory==NULL && ext_pub_list==NULL){ //No (local or external) publishers yet for this topic
 +		pubsubAdmin_addSubscriptionToPendingList(admin,subEP);
 +	}
 +	else{
 +		int i;
 +		topic_subscription_pt subscription = hashMap_get(admin->subscriptions, scope_topic);
 +
 +		if(subscription == NULL) {
 +			pubsub_serializer_service_t *best_serializer = NULL;
 +			if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){
- 				status += pubsub_topicSubscriptionCreate(admin->bundle_context,admin->ifIpAddress, subEP->scope, subEP->topic, best_serializer, &subscription);
++				status += pubsub_topicSubscriptionCreate(admin->bundle_context,admin->ifIpAddress, (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), best_serializer, &subscription);
 +			}
 +			else{
- 				printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",subEP->topic);
++				printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",
++					   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
++
 +				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +				arrayList_add(admin->noSerializerSubscriptions,subEP);
 +				celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +			}
 +
 +			if (status==CELIX_SUCCESS){
 +
 +				/* Try to connect internal publishers */
 +				if(factory!=NULL){
 +					topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle;
 +					array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs);
 +
 +					if(topic_publishers!=NULL){
 +						for(i=0;i<arrayList_size(topic_publishers);i++){
 +							pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
- 							if(pubEP->endpoint !=NULL){
- 								status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
++							if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
++								status += pubsub_topicSubscriptionConnectPublisher(subscription, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +							}
 +						}
 +						arrayList_destroy(topic_publishers);
 +					}
 +
 +				}
 +
 +				/* Look also for external publishers */
 +				if(ext_pub_list!=NULL){
 +					for(i=0;i<arrayList_size(ext_pub_list);i++){
 +						pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
- 						if(pubEP->endpoint !=NULL){
- 							status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
++						if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
++							status += pubsub_topicSubscriptionConnectPublisher(subscription, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +						}
 +					}
 +				}
 +
 +				pubsub_topicSubscriptionAddSubscriber(subscription,subEP);
 +
 +				status += pubsub_topicSubscriptionStart(subscription);
 +
 +			}
 +
 +			if(status==CELIX_SUCCESS){
 +
 +				hashMap_put(admin->subscriptions,strdup(scope_topic),subscription);
 +
 +				connectTopicPubSubToSerializer(admin, best_serializer, subscription, false);
 +			}
 +		}
 +
 +		if (status == CELIX_SUCCESS){
 +			pubsub_topicIncreaseNrSubscribers(subscription);
 +		}
 +	}
 +
 +	free(scope_topic);
 +	celixThreadMutex_unlock(&admin->externalPublicationsLock);
 +	celixThreadMutex_unlock(&admin->localPublicationsLock);
 +	celixThreadMutex_unlock(&admin->subscriptionsLock);
 +	celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
 +
 +	return status;
 +
 +}
 +
 +celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
 +	celix_status_t status = CELIX_SUCCESS;
 +
- 	printf("PSA_UDP_MC: Removing subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope, subEP->topic);
++	printf("PSA_UDP_MC: Removing subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",
++		   properties_get(subEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
++		   subEP->serviceID,
++		   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
++		   properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +
- 	char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
++	char* scope_topic = createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +
 +	celixThreadMutex_lock(&admin->subscriptionsLock);
 +	topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
 +	if(sub!=NULL){
 +		pubsub_topicDecreaseNrSubscribers(sub);
 +		if(pubsub_topicGetNrSubscribers(sub) == 0) {
 +			status = pubsub_topicSubscriptionRemoveSubscriber(sub,subEP);
 +		}
 +	}
 +	celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +	if(sub==NULL){
 +		/* Maybe the endpoint was pending */
 +		celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +		if(!arrayList_removeElement(admin->noSerializerSubscriptions, subEP)){
 +			status = CELIX_ILLEGAL_STATE;
 +		}
 +		celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +	}
 +
 +	free(scope_topic);
 +
 +
 +
 +	return status;
 +
 +}
 +
 +celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){
 +	celix_status_t status = CELIX_SUCCESS;
 +
- 	printf("PSA_UDP_MC: Received publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->scope, pubEP->topic);
++	printf("PSA_UDP_MC: Received publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",
++		   properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
++		   pubEP->serviceID,
++		   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
++		   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +
 +	const char* fwUUID = NULL;
 +
 +	bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
 +	if(fwUUID==NULL){
 +		printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n");
 +		return CELIX_INVALID_BUNDLE_CONTEXT;
 +	}
- 	char* scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
++	char* scope_topic = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +
- 	if ((strcmp(pubEP->frameworkUUID, fwUUID) == 0) && (pubEP->endpoint == NULL)) {
++	if ((strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), fwUUID) == 0) && (properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) == NULL)) {
 +
 +		celixThreadMutex_lock(&admin->localPublicationsLock);
 +
 +		service_factory_pt factory = (service_factory_pt) hashMap_get(admin->localPublications, scope_topic);
 +
 +		if (factory == NULL) {
 +			topic_publication_pt pub = NULL;
 +			pubsub_serializer_service_t *best_serializer = NULL;
 +			if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer)) == CELIX_SUCCESS){
 +				status = pubsub_topicPublicationCreate(admin->sendSocket, pubEP, best_serializer, admin->mcIpAddress, &pub);
 +			}
 +			else{
- 				printf("PSA_UDP_MC: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n", pubEP->topic);
++				printf("PSA_UDP_MC: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n",
++					   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
++
 +				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +				arrayList_add(admin->noSerializerPublications,pubEP);
 +				celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +			}
 +
 +			if (status == CELIX_SUCCESS) {
 +				status = pubsub_topicPublicationStart(admin->bundle_context, pub, &factory);
 +				if (status == CELIX_SUCCESS && factory != NULL) {
 +					hashMap_put(admin->localPublications, strdup(scope_topic), factory);
 +					connectTopicPubSubToSerializer(admin, best_serializer, pub, true);
 +				}
 +			} else {
- 				printf("PSA_UDP_MC: Cannot create a topicPublication for scope=%s, topic=%s (bundle %ld).\n", pubEP->scope, pubEP->topic, pubEP->serviceID);
++				printf("PSA_UDP_MC: Cannot create a topicPublication for scope=%s, topic=%s (bundle %ld).\n",
++					   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
++					   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
++					   pubEP->serviceID);
 +			}
 +		} else {
 +			//just add the new EP to the list
 +			topic_publication_pt pub = (topic_publication_pt) factory->handle;
 +			pubsub_topicPublicationAddPublisherEP(pub, pubEP);
 +		}
 +
 +		celixThreadMutex_unlock(&admin->localPublicationsLock);
 +	}
 +	else{
 +
 +		celixThreadMutex_lock(&admin->externalPublicationsLock);
 +		array_list_pt ext_pub_list = (array_list_pt) hashMap_get(admin->externalPublications, scope_topic);
 +		if (ext_pub_list == NULL) {
 +			arrayList_create(&ext_pub_list);
 +			hashMap_put(admin->externalPublications, strdup(scope_topic), ext_pub_list);
 +		}
 +
 +		arrayList_add(ext_pub_list, pubEP);
 +
 +		celixThreadMutex_unlock(&admin->externalPublicationsLock);
 +	}
 +
 +	/* Re-evaluate the pending subscriptions */
 +	celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
 +
 +	hash_map_entry_pt pendingSub = hashMap_getEntry(admin->pendingSubscriptions, scope_topic);
 +	if (pendingSub != NULL) { //There were pending subscription for the just published topic. Let's connect them.
 +		char* topic = (char*) hashMapEntry_getKey(pendingSub);
 +		array_list_pt pendingSubList = (array_list_pt) hashMapEntry_getValue(pendingSub);
 +		int i;
 +		for (i = 0; i < arrayList_size(pendingSubList); i++) {
 +			pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) arrayList_get(pendingSubList, i);
 +			pubsubAdmin_addSubscription(admin, subEP);
 +		}
 +		hashMap_remove(admin->pendingSubscriptions, scope_topic);
 +		arrayList_clear(pendingSubList);
 +		arrayList_destroy(pendingSubList);
 +		free(topic);
 +	}
 +
 +	celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
 +
 +	/* Connect the new publisher to the subscription for his topic, if there is any */
 +	celixThreadMutex_lock(&admin->subscriptionsLock);
 +
 +	topic_subscription_pt sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, scope_topic);
- 	if (sub != NULL && pubEP->endpoint != NULL) {
- 		pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, pubEP->endpoint);
++	if (sub != NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) != NULL) {
++		pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +	}
 +
 +	/* And check also for ANY subscription */
 +	topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
- 	if (any_sub != NULL && pubEP->endpoint != NULL) {
- 		pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, pubEP->endpoint);
++	if (any_sub != NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) != NULL) {
++		pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +	}
 +
 +	free(scope_topic);
 +
 +	celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +	return status;
 +
 +}
 +
 +celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){
 +	celix_status_t status = CELIX_SUCCESS;
 +	int count = 0;
 +
- 	printf("PSA_UDP_MC: Removing publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->scope, pubEP->topic);
++	printf("PSA_UDP_MC: Removing publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",
++		   properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
++		   pubEP->serviceID,
++		   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
++		   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +
 +	const char* fwUUID = NULL;
 +
 +	bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
 +	if(fwUUID==NULL){
 +		printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n");
 +		return CELIX_INVALID_BUNDLE_CONTEXT;
 +	}
- 	char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
++	char *scope_topic = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +
- 	if(strcmp(pubEP->frameworkUUID,fwUUID)==0){
++	if(strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0){
 +
 +		celixThreadMutex_lock(&admin->localPublicationsLock);
 +		service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
 +		if(factory!=NULL){
 +			topic_publication_pt pub = (topic_publication_pt)factory->handle;
 +			pubsub_topicPublicationRemovePublisherEP(pub,pubEP);
 +		}
 +		celixThreadMutex_unlock(&admin->localPublicationsLock);
 +
 +		if(factory==NULL){
 +			/* Maybe the endpoint was pending */
 +			celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +			if(!arrayList_removeElement(admin->noSerializerPublications, pubEP)){
 +				status = CELIX_ILLEGAL_STATE;
 +			}
 +			celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +		}
 +
 +	}
 +	else{
 +
 +		celixThreadMutex_lock(&admin->externalPublicationsLock);
 +		array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
 +		if(ext_pub_list!=NULL){
 +			int i;
 +			bool found = false;
 +			for(i=0;!found && i<arrayList_size(ext_pub_list);i++){
 +				pubsub_endpoint_pt p  = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
 +				found = pubsubEndpoint_equals(pubEP,p);
 +				if (found){
 +					arrayList_remove(ext_pub_list,i);
 +				}
 +			}
 +			// Check if there are more publishers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic)
 +			for(i=0; i<arrayList_size(ext_pub_list);i++) {
 +				pubsub_endpoint_pt p  = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
- 				if (strcmp(pubEP->endpoint,p->endpoint) == 0) {
++				if (strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL),properties_get(p->endpoint_props, PUBSUB_ENDPOINT_URL)) == 0) {
 +					count++;
 +				}
 +			}
 +
 +			if(arrayList_size(ext_pub_list)==0){
 +				hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic);
 +				char* topic = (char*)hashMapEntry_getKey(entry);
 +				array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry);
 +				hashMap_remove(admin->externalPublications,topic);
 +				arrayList_destroy(list);
 +				free(topic);
 +			}
 +		}
 +
 +		celixThreadMutex_unlock(&admin->externalPublicationsLock);
 +	}
 +
 +	/* Check if this publisher was connected to one of our subscribers*/
 +	celixThreadMutex_lock(&admin->subscriptionsLock);
 +
 +	topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
- 	if(sub!=NULL && pubEP->endpoint!=NULL && count == 0){
- 		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint);
++	if(sub!=NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL && count == 0){
++		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +	}
 +
 +	/* And check also for ANY subscription */
 +	topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
- 	if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){
- 		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,pubEP->endpoint);
++	if(any_sub!=NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL && count == 0){
++		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +	}
 +
 +	free(scope_topic);
 +	celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +	return status;
 +
 +}
 +
 +celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char *scope, char* topic){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	printf("PSA_UDP_MC: Closing all publications for scope=%s,topic=%s\n", scope, topic);
 +
 +	celixThreadMutex_lock(&admin->localPublicationsLock);
 +	char* scope_topic =createScopeTopicKey(scope, topic);
 +	hash_map_entry_pt pubsvc_entry = (hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic);
 +	if(pubsvc_entry!=NULL){
 +		char* key = (char*)hashMapEntry_getKey(pubsvc_entry);
 +		service_factory_pt factory= (service_factory_pt)hashMapEntry_getValue(pubsvc_entry);
 +		topic_publication_pt pub = (topic_publication_pt)factory->handle;
 +		status += pubsub_topicPublicationStop(pub);
 +		disconnectTopicPubSubFromSerializer(admin, pub, true);
 +		status += pubsub_topicPublicationDestroy(pub);
 +		hashMap_remove(admin->localPublications,scope_topic);
 +		free(key);
 +		free(factory);
 +	}
 +	free(scope_topic);
 +	celixThreadMutex_unlock(&admin->localPublicationsLock);
 +
 +	return status;
 +
 +}
 +
 +celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char *scope, char* topic){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	printf("PSA_UDP_MC: Closing all subscriptions\n");
 +
 +	celixThreadMutex_lock(&admin->subscriptionsLock);
 +	char* scope_topic =createScopeTopicKey(scope, topic);
 +	hash_map_entry_pt sub_entry = (hash_map_entry_pt)hashMap_getEntry(admin->subscriptions,scope_topic);
 +	if(sub_entry!=NULL){
 +		char* topic = (char*)hashMapEntry_getKey(sub_entry);
 +
 +		topic_subscription_pt ts = (topic_subscription_pt)hashMapEntry_getValue(sub_entry);
 +		status += pubsub_topicSubscriptionStop(ts);
 +		disconnectTopicPubSubFromSerializer(admin, ts, false);
 +		status += pubsub_topicSubscriptionDestroy(ts);
 +		hashMap_remove(admin->subscriptions,topic);
 +		free(topic);
 +
 +	}
 +	free(scope_topic);
 +	celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +	return status;
 +
 +}
 +
 +
 +#ifndef ANDROID
 +static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip) {
 +	celix_status_t status = CELIX_BUNDLE_EXCEPTION;
 +
 +	struct ifaddrs *ifaddr, *ifa;
 +	char host[NI_MAXHOST];
 +
 +	if (getifaddrs(&ifaddr) != -1)
 +	{
 +		for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = ifa->ifa_next)
 +		{
 +			if (ifa->ifa_addr == NULL)
 +				continue;
 +
 +			if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) {
 +				if (interface == NULL) {
 +					*ip = strdup(host);
 +					status = CELIX_SUCCESS;
 +				}
 +				else if (strcmp(ifa->ifa_name, interface) == 0) {
 +					*ip = strdup(host);
 +					status = CELIX_SUCCESS;
 +				}
 +			}
 +		}
 +
 +		freeifaddrs(ifaddr);
 +	}
 +
 +	return status;
 +}
 +#endif
 +
 +static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
 +	celix_status_t status = CELIX_SUCCESS;
 +
- 	char* scope_topic =createScopeTopicKey(subEP->scope, subEP->topic);
++	char* scope_topic =createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +	array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic);
 +	if(pendingListPerTopic==NULL){
 +		arrayList_create(&pendingListPerTopic);
 +		hashMap_put(admin->pendingSubscriptions,strdup(scope_topic),pendingListPerTopic);
 +	}
 +	arrayList_add(pendingListPerTopic,subEP);
 +	free(scope_topic);
 +
 +	return status;
 +}
 +
 +
 +celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt reference, void * service){
 +	/* Assumption: serializers are all available at startup.
 +	 * If a new (possibly better) serializer is installed and started, already created topic_publications/subscriptions will not be destroyed and recreated */
 +
 +	celix_status_t status = CELIX_SUCCESS;
 +	int i=0;
 +
 +	const char *serType = NULL;
 +	serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
 +	if(serType == NULL){
 +		printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference);
 +		return CELIX_SERVICE_EXCEPTION;
 +	}
 +
 +	pubsub_admin_pt admin = (pubsub_admin_pt)handle;
 +	celixThreadMutex_lock(&admin->serializerListLock);
 +	arrayList_add(admin->serializerList, reference);
 +	celixThreadMutex_unlock(&admin->serializerListLock);
 +
 +	/* Now let's re-evaluate the pending */
 +	celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +
 +	for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){
 +		pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i);
 +		pubsub_serializer_service_t *best_serializer = NULL;
 +		pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
 +		if(best_serializer != NULL){ /* Finally we have a valid serializer! */
 +			pubsubAdmin_addSubscription(admin, ep);
 +		}
 +	}
 +
 +	for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){
 +		pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i);
 +		pubsub_serializer_service_t *best_serializer = NULL;
 +		pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
 +		if(best_serializer != NULL){ /* Finally we have a valid serializer! */
 +			pubsubAdmin_addPublication(admin, ep);
 +		}
 +	}
 +
 +	celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +
 +	printf("PSA_UDP_MC: %s serializer added\n",serType);
 +
 +	return status;
 +}
 +
 +celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service){
 +
 +	pubsub_admin_pt admin = (pubsub_admin_pt)handle;
 +	int i=0, j=0;
 +	const char *serType = NULL;
 +
 +	serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
 +	if(serType == NULL){
 +		printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference);
 +		return CELIX_SERVICE_EXCEPTION;
 +	}
 +
 +	celixThreadMutex_lock(&admin->serializerListLock);
 +	/* Remove the serializer from the list */
 +	arrayList_removeElement(admin->serializerList, reference);
 +	celixThreadMutex_unlock(&admin->serializerListLock);
 +
 +	celixThreadMutex_lock(&admin->usedSerializersLock);
 +	array_list_pt topicPubList = (array_list_pt)hashMap_remove(admin->topicPublicationsPerSerializer, service);
 +	array_list_pt topicSubList = (array_list_pt)hashMap_remove(admin->topicSubscriptionsPerSerializer, service);
 +	celixThreadMutex_unlock(&admin->usedSerializersLock);
 +
 +	/* Now destroy the topicPublications, but first put back the pubsub_endpoints back to the noSerializer pending list */
 +	if(topicPubList!=NULL){
 +		for(i=0;i<arrayList_size(topicPubList);i++){
 +			topic_publication_pt topicPub = (topic_publication_pt)arrayList_get(topicPubList,i);
 +			/* Stop the topic publication */
 +			pubsub_topicPublicationStop(topicPub);
 +			/* Get the endpoints that are going to be orphan */
 +			array_list_pt pubList = pubsub_topicPublicationGetPublisherList(topicPub);
 +			for(j=0;j<arrayList_size(pubList);j++){
 +				pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubList,j);
 +				/* Remove the publication */
 +				pubsubAdmin_removePublication(admin, pubEP);
 +				/* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */
- 				if(pubEP->endpoint!=NULL){
- 					free(pubEP->endpoint);
- 					pubEP->endpoint = NULL;
++				if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL){
++					properties_unset(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL);
 +				}
 +				/* Add the orphan endpoint to the noSerializer pending list */
 +				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +				arrayList_add(admin->noSerializerPublications,pubEP);
 +				celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +			}
 +			arrayList_destroy(pubList);
 +
 +			/* Cleanup also the localPublications hashmap*/
 +			celixThreadMutex_lock(&admin->localPublicationsLock);
 +			hash_map_iterator_pt iter = hashMapIterator_create(admin->localPublications);
 +			char *key = NULL;
 +			service_factory_pt factory = NULL;
 +			while(hashMapIterator_hasNext(iter)){
 +				hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
 +				factory = (service_factory_pt)hashMapEntry_getValue(entry);
 +				topic_publication_pt pub = (topic_publication_pt)factory->handle;
 +				if(pub==topicPub){
 +					key = (char*)hashMapEntry_getKey(entry);
 +					break;
 +				}
 +			}
 +			hashMapIterator_destroy(iter);
 +			if(key!=NULL){
 +				hashMap_remove(admin->localPublications, key);
 +				free(factory);
 +				free(key);
 +			}
 +			celixThreadMutex_unlock(&admin->localPublicationsLock);
 +
 +			/* Finally destroy the topicPublication */
 +			pubsub_topicPublicationDestroy(topicPub);
 +		}
 +		arrayList_destroy(topicPubList);
 +	}
 +
 +	/* Now destroy the topicSubscriptions, but first put back the pubsub_endpoints back to the noSerializer pending list */
 +	if(topicSubList!=NULL){
 +		for(i=0;i<arrayList_size(topicSubList);i++){
 +			topic_subscription_pt topicSub = (topic_subscription_pt)arrayList_get(topicSubList,i);
 +			/* Stop the topic subscription */
 +			pubsub_topicSubscriptionStop(topicSub);
 +			/* Get the endpoints that are going to be orphan */
 +			array_list_pt subList = pubsub_topicSubscriptionGetSubscribersList(topicSub);
 +			for(j=0;j<arrayList_size(subList);j++){
 +				pubsub_endpoint_pt subEP = (pubsub_endpoint_pt)arrayList_get(subList,j);
 +				/* Remove the subscription */
 +				pubsubAdmin_removeSubscription(admin, subEP);
 +				/* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */
- 				if(subEP->endpoint!=NULL){
- 					free(subEP->endpoint);
- 					subEP->endpoint = NULL;
++				if(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL){
++					properties_unset(subEP->endpoint_props, PUBSUB_ENDPOINT_URL);
 +				}
 +				/* Add the orphan endpoint to the noSerializer pending list */
 +				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +				arrayList_add(admin->noSerializerSubscriptions,subEP);
 +				celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +			}
 +
 +			/* Cleanup also the subscriptions hashmap*/
 +			celixThreadMutex_lock(&admin->subscriptionsLock);
 +			hash_map_iterator_pt iter = hashMapIterator_create(admin->subscriptions);
 +			char *key = NULL;
 +			while(hashMapIterator_hasNext(iter)){
 +				hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
 +				topic_subscription_pt sub = (topic_subscription_pt)hashMapEntry_getValue(entry);
 +				if(sub==topicSub){
 +					key = (char*)hashMapEntry_getKey(entry);
 +					break;
 +				}
 +			}
 +			hashMapIterator_destroy(iter);
 +			if(key!=NULL){
 +				hashMap_remove(admin->subscriptions, key);
 +				free(key);
 +			}
 +			celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +			/* Finally destroy the topicSubscription */
 +			pubsub_topicSubscriptionDestroy(topicSub);
 +		}
 +		arrayList_destroy(topicSubList);
 +	}
 +
 +	printf("PSA_UDP_MC: %s serializer removed\n",serType);
 +
 +
 +	return CELIX_SUCCESS;
 +}
 +
 +celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	celixThreadMutex_lock(&admin->serializerListLock);
 +	status = pubsub_admin_match(endpoint->topic_props,PUBSUB_ADMIN_TYPE,admin->serializerList,score);
 +	celixThreadMutex_unlock(&admin->serializerListLock);
 +
 +	return status;
 +}
 +
 +/* This one recall the same logic as in the match function */
 +static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc){
 +
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	celixThreadMutex_lock(&admin->serializerListLock);
 +	status = pubsub_admin_get_best_serializer(ep->topic_props, admin->serializerList, serSvc);
 +	celixThreadMutex_unlock(&admin->serializerListLock);
 +
 +	return status;
 +
 +}
 +
 +static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication){
 +
 +	celixThreadMutex_lock(&admin->usedSerializersLock);
 +
 +	hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
 +	array_list_pt list = (array_list_pt)hashMap_get(map,serializer);
 +	if(list==NULL){
 +		arrayList_create(&list);
 +		hashMap_put(map,serializer,list);
 +	}
 +	arrayList_add(list,topicPubSub);
 +
 +	celixThreadMutex_unlock(&admin->usedSerializersLock);
 +
 +}
 +
 +static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication){
 +
 +	celixThreadMutex_lock(&admin->usedSerializersLock);
 +
 +	hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
 +	hash_map_iterator_pt iter = hashMapIterator_create(map);
 +	while(hashMapIterator_hasNext(iter)){
 +		array_list_pt list = (array_list_pt)hashMapIterator_nextValue(iter);
 +		if(arrayList_removeElement(list, topicPubSub)){ //Found it!
 +			break;
 +		}
 +	}
 +	hashMapIterator_destroy(iter);
 +
 +	celixThreadMutex_unlock(&admin->usedSerializersLock);
 +
 +}

http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
----------------------------------------------------------------------
diff --cc pubsub/pubsub_admin_udp_mc/src/topic_publication.c
index 15af108,0000000..79a2698
mode 100644,000000..100644
--- a/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
+++ b/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
@@@ -1,437 -1,0 +1,440 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +
 +#include <stdlib.h>
 +#include <string.h>
 +#include <unistd.h>
 +#include <errno.h>
 +
 +#include <sys/types.h>
 +#include <sys/socket.h>
 +#include <netinet/in.h>
 +#include <arpa/inet.h>
 +
 +#include "array_list.h"
 +#include "celixbool.h"
 +#include "service_registration.h"
 +#include "utils.h"
 +#include "service_factory.h"
 +#include "version.h"
 +
 +#include "topic_publication.h"
 +#include "pubsub_common.h"
 +#include "pubsub/publisher.h"
 +#include "large_udp.h"
 +
 +#include "pubsub_serializer.h"
 +
 +#define EP_ADDRESS_LEN		32
 +
 +#define FIRST_SEND_DELAY	2
 +
 +struct topic_publication {
 +	int sendSocket;
 +	char* endpoint;
 +	service_registration_pt svcFactoryReg;
 +	array_list_pt pub_ep_list; //List<pubsub_endpoint>
 +	hash_map_pt boundServices; //<bundle_pt,bound_service>
 +	celix_thread_mutex_t tp_lock;
 +	pubsub_serializer_service_t *serializer;
 +	struct sockaddr_in destAddr;
 +};
 +
 +typedef struct publish_bundle_bound_service {
 +	topic_publication_pt parent;
 +	pubsub_publisher_t service;
 +	bundle_pt bundle;
 +	char *scope;
 +	char *topic;
 +	hash_map_pt msgTypes;
 +	unsigned short getCount;
 +	celix_thread_mutex_t mp_lock;
 +	largeUdp_pt largeUdpHandle;
 +}* publish_bundle_bound_service_pt;
 +
 +
 +typedef struct pubsub_msg{
 +	pubsub_msg_header_pt header;
 +	char* payload;
 +	unsigned int payloadSize;
 +} pubsub_msg_t;
 +
 +
 +static unsigned int rand_range(unsigned int min, unsigned int max);
 +
 +static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
 +static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
 +
 +static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle);
 +static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc);
 +
 +static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, const void *msg);
 +
 +static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId);
 +
 +
 +static void delay_first_send_for_late_joiners(void);
 +
 +
 +celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, topic_publication_pt *out){
 +
 +	char* ep = malloc(EP_ADDRESS_LEN);
 +	memset(ep,0,EP_ADDRESS_LEN);
 +	unsigned int port = pubEP->serviceID + rand_range(UDP_BASE_PORT+pubEP->serviceID+3, UDP_MAX_PORT);
 +	snprintf(ep,EP_ADDRESS_LEN,"udp://%s:%u",bindIP,port);
 +
 +
 +	topic_publication_pt pub = calloc(1,sizeof(*pub));
 +
 +	arrayList_create(&(pub->pub_ep_list));
 +	pub->boundServices = hashMap_create(NULL,NULL,NULL,NULL);
 +	celixThreadMutex_create(&(pub->tp_lock),NULL);
 +
 +	pub->endpoint = ep;
 +	pub->sendSocket = sendSocket;
 +	pub->destAddr.sin_family = AF_INET;
 +	pub->destAddr.sin_addr.s_addr = inet_addr(bindIP);
 +	pub->destAddr.sin_port = htons(port);
 +
 +	pub->serializer = best_serializer;
 +
 +	pubsub_topicPublicationAddPublisherEP(pub,pubEP);
 +
 +	*out = pub;
 +
 +	return CELIX_SUCCESS;
 +}
 +
 +celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	celixThreadMutex_lock(&(pub->tp_lock));
 +
 +	free(pub->endpoint);
 +	arrayList_destroy(pub->pub_ep_list);
 +
 +	hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
 +	while(hashMapIterator_hasNext(iter)){
 +		publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter);
 +		pubsub_destroyPublishBundleBoundService(bound);
 +	}
 +	hashMapIterator_destroy(iter);
 +	hashMap_destroy(pub->boundServices,false,false);
 +
 +	pub->svcFactoryReg = NULL;
 +	pub->serializer = NULL;
 +
 +	if(close(pub->sendSocket) != 0){
 +		status = CELIX_FILE_IO_EXCEPTION;
 +	}
 +
 +	celixThreadMutex_unlock(&(pub->tp_lock));
 +
 +	celixThreadMutex_destroy(&(pub->tp_lock));
 +
 +	free(pub);
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	/* Let's register the new service */
 +
 +	pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0);
 +
 +	if(pubEP!=NULL){
 +		service_factory_pt factory = calloc(1, sizeof(*factory));
 +		factory->handle = pub;
 +		factory->getService = pubsub_topicPublicationGetService;
 +		factory->ungetService = pubsub_topicPublicationUngetService;
 +
 +		properties_pt props = properties_create();
- 		properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope);
- 		properties_set(props,PUBSUB_PUBLISHER_TOPIC,pubEP->topic);
++		properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE));
++		properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +
 +		status = bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg));
 +
 +		if(status != CELIX_SUCCESS){
 +			properties_destroy(props);
- 			printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot register ServiceFactory for topic %s, topic %s (bundle %ld).\n",pubEP->scope, pubEP->topic,pubEP->serviceID);
++			printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot register ServiceFactory for topic %s, topic %s (bundle %ld).\n",
++				   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
++				   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
++				   pubEP->serviceID);
 +		}
 +		else{
 +			*svcFactory = factory;
 +		}
 +	}
 +	else{
 +		printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot find pubsub_endpoint after adding it...Should never happen!\n");
 +		status = CELIX_SERVICE_EXCEPTION;
 +	}
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
 +	return serviceRegistration_unregister(pub->svcFactoryReg);
 +}
 +
 +celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){
 +
 +	celixThreadMutex_lock(&(pub->tp_lock));
- 	ep->endpoint = strdup(pub->endpoint);
++	pubsubEndpoint_setField(ep, PUBSUB_ENDPOINT_URL, pub->endpoint);
 +	arrayList_add(pub->pub_ep_list,ep);
 +	celixThreadMutex_unlock(&(pub->tp_lock));
 +
 +	return CELIX_SUCCESS;
 +}
 +
 +celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){
 +
 +	celixThreadMutex_lock(&(pub->tp_lock));
 +	arrayList_removeElement(pub->pub_ep_list,ep);
 +	celixThreadMutex_unlock(&(pub->tp_lock));
 +
 +	return CELIX_SUCCESS;
 +}
 +
 +array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){
 +	array_list_pt list = NULL;
 +	celixThreadMutex_lock(&(pub->tp_lock));
 +	list = arrayList_clone(pub->pub_ep_list);
 +	celixThreadMutex_unlock(&(pub->tp_lock));
 +	return list;
 +}
 +
 +
 +static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) {
 +	celix_status_t  status = CELIX_SUCCESS;
 +
 +	topic_publication_pt publish = (topic_publication_pt)handle;
 +
 +	celixThreadMutex_lock(&(publish->tp_lock));
 +
 +	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
 +	if(bound==NULL){
 +		bound = pubsub_createPublishBundleBoundService(publish,bundle);
 +		if(bound!=NULL){
 +			hashMap_put(publish->boundServices,bundle,bound);
 +		}
 +	}
 +	else{
 +		bound->getCount++;
 +	}
 +
 +	if (bound != NULL) {
 +		*service = &bound->service;
 +	}
 +
 +	celixThreadMutex_unlock(&(publish->tp_lock));
 +
 +	return status;
 +}
 +
 +static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service)  {
 +
 +	topic_publication_pt publish = (topic_publication_pt)handle;
 +
 +	celixThreadMutex_lock(&(publish->tp_lock));
 +
 +	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
 +	if(bound!=NULL){
 +
 +		bound->getCount--;
 +		if(bound->getCount==0){
 +			pubsub_destroyPublishBundleBoundService(bound);
 +			hashMap_remove(publish->boundServices,bundle);
 +		}
 +
 +	}
 +	else{
 +		long bundleId = -1;
 +		bundle_getBundleId(bundle,&bundleId);
 +		printf("PSA_UDP_MC_TP: Unexpected ungetService call for bundle %ld.\n", bundleId);
 +	}
 +
 +	/* service should be never used for unget, so let's set the pointer to NULL */
 +	*service = NULL;
 +
 +	celixThreadMutex_unlock(&(publish->tp_lock));
 +
 +	return CELIX_SUCCESS;
 +}
 +
 +static bool send_pubsub_msg(publish_bundle_bound_service_pt bound, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback){
 +	const int iovec_len = 3; // header + size + payload
 +	bool ret = true;
 +
 +	struct iovec msg_iovec[iovec_len];
 +	msg_iovec[0].iov_base = msg->header;
 +	msg_iovec[0].iov_len = sizeof(*msg->header);
 +	msg_iovec[1].iov_base = &msg->payloadSize;
 +	msg_iovec[1].iov_len = sizeof(msg->payloadSize);
 +	msg_iovec[2].iov_base = msg->payload;
 +	msg_iovec[2].iov_len = msg->payloadSize;
 +
 +	delay_first_send_for_late_joiners();
 +
 +	if(largeUdp_sendmsg(bound->largeUdpHandle, bound->parent->sendSocket, msg_iovec, iovec_len, 0, &bound->parent->destAddr, sizeof(bound->parent->destAddr)) == -1) {
 +		perror("send_pubsub_msg:sendSocket");
 +		ret = false;
 +	}
 +
 +	if(releaseCallback) {
 +		releaseCallback->release(msg->payload, bound);
 +	}
 +	return ret;
 +
 +}
 +
 +
 +static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg) {
 +	int status = 0;
 +	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle;
 +
 +	celixThreadMutex_lock(&(bound->parent->tp_lock));
 +	celixThreadMutex_lock(&(bound->mp_lock));
 +
 +	pubsub_msg_serializer_t* msgSer = (pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, (void*)(intptr_t)msgTypeId);
 +
 +	if (msgSer != NULL) {
 +		int major=0, minor=0;
 +
 +		pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
 +		strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
 +		msg_hdr->type = msgTypeId;
 +
 +
 +		if (msgSer->msgVersion != NULL){
 +			version_getMajor(msgSer->msgVersion, &major);
 +			version_getMinor(msgSer->msgVersion, &minor);
 +			msg_hdr->major = major;
 +			msg_hdr->minor = minor;
 +		}
 +
 +		void* serializedOutput = NULL;
 +		size_t serializedOutputLen = 0;
 +		msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen);
 +
 +		pubsub_msg_t *msg = calloc(1,sizeof(pubsub_msg_t));
 +		msg->header = msg_hdr;
 +		msg->payload = (char*)serializedOutput;
 +		msg->payloadSize = serializedOutputLen;
 +
 +
 +		if(send_pubsub_msg(bound, msg,true, NULL) == false) {
 +			status = -1;
 +		}
 +		free(msg_hdr);
 +		free(msg);
 +		free(serializedOutput);
 +
 +
 +	} else {
 +		printf("PSA_UDP_MC_TP: No msg serializer available for msg type id %d\n", msgTypeId);
 +		status=-1;
 +	}
 +
 +	celixThreadMutex_unlock(&(bound->mp_lock));
 +	celixThreadMutex_unlock(&(bound->parent->tp_lock));
 +
 +	return status;
 +}
 +
 +static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){
 +	*msgTypeId = utils_stringHash(msgType);
 +	return 0;
 +}
 +
 +
 +static unsigned int rand_range(unsigned int min, unsigned int max){
 +
 +	double scaled = (double)(((double)random())/((double)RAND_MAX));
 +	return (max-min+1)*scaled + min;
 +
 +}
 +
 +static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){
 +
 +	publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
 +
 +	if (bound != NULL) {
 +
 +		bound->parent = tp;
 +		bound->bundle = bundle;
 +		bound->getCount = 1;
 +		celixThreadMutex_create(&bound->mp_lock,NULL);
 +
 +		if(tp->serializer != NULL){
 +			tp->serializer->createSerializerMap(tp->serializer->handle,bundle,&bound->msgTypes);
 +		}
 +
 +		pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
- 		bound->scope=strdup(pubEP->scope);
- 		bound->topic=strdup(pubEP->topic);
++		bound->scope=strdup(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE));
++		bound->topic=strdup(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +		bound->largeUdpHandle = largeUdp_create(1);
 +
 +		bound->service.handle = bound;
 +		bound->service.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID;
 +		bound->service.send = pubsub_topicPublicationSend;
 +		bound->service.sendMultipart = NULL;  //Multipart not supported for UDP
 +
 +	}
 +
 +	return bound;
 +}
 +
 +static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){
 +
 +	celixThreadMutex_lock(&boundSvc->mp_lock);
 +
 +	if(boundSvc->parent->serializer != NULL && boundSvc->msgTypes != NULL){
 +		boundSvc->parent->serializer->destroySerializerMap(boundSvc->parent->serializer->handle, boundSvc->msgTypes);
 +	}
 +
 +	if(boundSvc->scope!=NULL){
 +		free(boundSvc->scope);
 +	}
 +
 +	if(boundSvc->topic!=NULL){
 +		free(boundSvc->topic);
 +	}
 +
 +	largeUdp_destroy(boundSvc->largeUdpHandle);
 +
 +	celixThreadMutex_unlock(&boundSvc->mp_lock);
 +	celixThreadMutex_destroy(&boundSvc->mp_lock);
 +
 +	free(boundSvc);
 +
 +}
 +
 +static void delay_first_send_for_late_joiners(){
 +
 +	static bool firstSend = true;
 +
 +	if(firstSend){
 +		printf("PSA_UDP_MC_TP: Delaying first send for late joiners...\n");
 +		sleep(FIRST_SEND_DELAY);
 +		firstSend = false;
 +	}
 +}


Mime
View raw message