celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rlenfer...@apache.org
Subject [2/4] celix git commit: Merge branch 'endpoint-format' into develop
Date Fri, 02 Feb 2018 13:38:09 GMT
http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --cc pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index 94a8e11,0000000..e3e9704
mode 100644,000000..100644
--- a/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@@ -1,457 -1,0 +1,460 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +
 +#include <stdio.h>
 +#include <string.h>
 +#include <stdlib.h>
 +#include <unistd.h>
 +#include <stdbool.h>
 +#include <netdb.h>
 +#include <netinet/in.h>
 +
 +#include "constants.h"
 +#include "celix_threads.h"
 +#include "bundle_context.h"
 +#include "array_list.h"
 +#include "utils.h"
 +#include "celix_errno.h"
 +#include "filter.h"
 +#include "service_reference.h"
 +#include "service_registration.h"
 +
 +#include "publisher_endpoint_announce.h"
 +#include "etcd_common.h"
 +#include "etcd_watcher.h"
 +#include "etcd_writer.h"
 +#include "pubsub_endpoint.h"
 +#include "pubsub_discovery_impl.h"
 +
 +/* Discovery activator functions */
 +celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discovery_pt *ps_discovery) {
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	*ps_discovery = calloc(1, sizeof(**ps_discovery));
 +
 +	if (*ps_discovery == NULL) {
 +		status = CELIX_ENOMEM;
 +	}
 +	else{
 +		(*ps_discovery)->context = context;
 +		(*ps_discovery)->discoveredPubs = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 +		(*ps_discovery)->listenerReferences = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL);
 +		(*ps_discovery)->watchers = hashMap_create(utils_stringHash,NULL,utils_stringEquals, NULL);
 +		celixThreadMutex_create(&(*ps_discovery)->listenerReferencesMutex, NULL);
 +		celixThreadMutex_create(&(*ps_discovery)->discoveredPubsMutex, NULL);
 +		celixThreadMutex_create(&(*ps_discovery)->watchersMutex, NULL);
 +	}
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_discovery_destroy(pubsub_discovery_pt ps_discovery) {
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	celixThreadMutex_lock(&ps_discovery->discoveredPubsMutex);
 +
 +	hash_map_iterator_pt iter = hashMapIterator_create(ps_discovery->discoveredPubs);
 +
 +	while (hashMapIterator_hasNext(iter)) {
 +		array_list_pt pubEP_list = (array_list_pt) hashMapIterator_nextValue(iter);
 +
 +		for(int i=0; i < arrayList_size(pubEP_list); i++) {
 +			pubsubEndpoint_destroy(((pubsub_endpoint_pt)arrayList_get(pubEP_list,i)));
 +		}
 +		arrayList_destroy(pubEP_list);
 +	}
 +
 +	hashMapIterator_destroy(iter);
 +
 +	hashMap_destroy(ps_discovery->discoveredPubs, true, false);
 +	ps_discovery->discoveredPubs = NULL;
 +
 +	celixThreadMutex_unlock(&ps_discovery->discoveredPubsMutex);
 +
 +	celixThreadMutex_destroy(&ps_discovery->discoveredPubsMutex);
 +
 +
 +	celixThreadMutex_lock(&ps_discovery->listenerReferencesMutex);
 +
 +	hashMap_destroy(ps_discovery->listenerReferences, false, false);
 +	ps_discovery->listenerReferences = NULL;
 +
 +	celixThreadMutex_unlock(&ps_discovery->listenerReferencesMutex);
 +
 +	celixThreadMutex_destroy(&ps_discovery->listenerReferencesMutex);
 +
 +	free(ps_discovery);
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_discovery_start(pubsub_discovery_pt ps_discovery) {
 +    celix_status_t status = CELIX_SUCCESS;
 +    status = etcdCommon_init(ps_discovery->context);
 +    ps_discovery->writer = etcdWriter_create(ps_discovery);
 +
 +    return status;
 +}
 +
 +celix_status_t pubsub_discovery_stop(pubsub_discovery_pt ps_discovery) {
 +    celix_status_t status = CELIX_SUCCESS;
 +
 +    const char* fwUUID = NULL;
 +
 +    bundleContext_getProperty(ps_discovery->context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
 +    if (fwUUID == NULL) {
 +        printf("PSD: Cannot retrieve fwUUID.\n");
 +        return CELIX_INVALID_BUNDLE_CONTEXT;
 +    }
 +
 +    celixThreadMutex_lock(&ps_discovery->watchersMutex);
 +
 +    hash_map_iterator_pt iter = hashMapIterator_create(ps_discovery->watchers);
 +    while (hashMapIterator_hasNext(iter)) {
 +        struct watcher_info * wi = hashMapIterator_nextValue(iter);
 +        etcdWatcher_stop(wi->watcher);
 +    }
 +    hashMapIterator_destroy(iter);
 +
 +    celixThreadMutex_lock(&ps_discovery->discoveredPubsMutex);
 +
 +    /* Unexport all publishers for the local framework, and also delete from ETCD publisher belonging to the local framework */
 +
 +    iter = hashMapIterator_create(ps_discovery->discoveredPubs);
 +    while (hashMapIterator_hasNext(iter)) {
 +        array_list_pt pubEP_list = (array_list_pt) hashMapIterator_nextValue(iter);
 +
 +        int i;
 +        for (i = 0; i < arrayList_size(pubEP_list); i++) {
 +            pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt) arrayList_get(pubEP_list, i);
-             if (strcmp(pubEP->frameworkUUID, fwUUID) == 0) {
++            if (strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), fwUUID) == 0) {
 +                etcdWriter_deletePublisherEndpoint(ps_discovery->writer, pubEP);
 +            } else {
 +                pubsub_discovery_informPublishersListeners(ps_discovery, pubEP, false);
 +                arrayList_remove(pubEP_list, i);
 +                pubsubEndpoint_destroy(pubEP);
 +                i--;
 +            }
 +        }
 +    }
 +
 +    hashMapIterator_destroy(iter);
 +
 +    celixThreadMutex_unlock(&ps_discovery->discoveredPubsMutex);
 +    etcdWriter_destroy(ps_discovery->writer);
 +
 +    iter = hashMapIterator_create(ps_discovery->watchers);
 +    while (hashMapIterator_hasNext(iter)) {
 +        struct watcher_info * wi = hashMapIterator_nextValue(iter);
 +        etcdWatcher_destroy(wi->watcher);
 +    }
 +    hashMapIterator_destroy(iter);
 +    hashMap_destroy(ps_discovery->watchers, true, true);
 +    celixThreadMutex_unlock(&ps_discovery->watchersMutex);
 +    return status;
 +}
 +
 +/* Functions called by the etcd_watcher */
 +
 +celix_status_t pubsub_discovery_addNode(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP) {
 +	celix_status_t status = CELIX_SUCCESS;
 +	bool inform=false;
 +	celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
 +
- 	char *pubs_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
++	char *pubs_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +	array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pubs_key);
 +	if(pubEP_list==NULL){
 +		arrayList_create(&pubEP_list);
 +		arrayList_add(pubEP_list,pubEP);
 +		hashMap_put(pubsub_discovery->discoveredPubs,strdup(pubs_key),pubEP_list);
 +		inform=true;
 +	}
 +	else{
 +		int i;
 +		bool found = false;
 +		for(i=0;i<arrayList_size(pubEP_list) && !found;i++){
 +			found = pubsubEndpoint_equals(pubEP,(pubsub_endpoint_pt)arrayList_get(pubEP_list,i));
 +		}
 +		if(found){
 +			pubsubEndpoint_destroy(pubEP);
 +		}
 +		else{
 +			arrayList_add(pubEP_list,pubEP);
 +			inform=true;
 +    	}
 +	}
 +	free(pubs_key);
 +
 +	celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
 +
 +	if(inform){
 +	    status = pubsub_discovery_informPublishersListeners(pubsub_discovery,pubEP,true);
 +	}
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_discovery_removeNode(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP) {
 +    celix_status_t status = CELIX_SUCCESS;
 +    pubsub_endpoint_pt p = NULL;
 +    bool found = false;
 +
 +    celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
-     char *pubs_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
++    char *pubs_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +    array_list_pt pubEP_list = (array_list_pt) hashMap_get(pubsub_discovery->discoveredPubs, pubs_key);
 +    free(pubs_key);
 +    if (pubEP_list == NULL) {
-         printf("PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n", pubEP->topic);
++        printf("PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n",
++			   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +        status = CELIX_ILLEGAL_STATE;
 +    } else {
 +        int i;
 +
 +        for (i = 0; !found && i < arrayList_size(pubEP_list); i++) {
 +            p = arrayList_get(pubEP_list, i);
 +            found = pubsubEndpoint_equals(pubEP, p);
 +            if (found) {
 +                arrayList_remove(pubEP_list, i);
 +                pubsubEndpoint_destroy(p);
 +            }
 +        }
 +    }
 +
 +    celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
 +    if (found) {
 +        status = pubsub_discovery_informPublishersListeners(pubsub_discovery, pubEP, false);
 +    }
 +    pubsubEndpoint_destroy(pubEP);
 +
 +    return status;
 +}
 +
 +/* Callback to the pubsub_topology_manager */
 +celix_status_t pubsub_discovery_informPublishersListeners(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP, bool epAdded) {
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	// Inform listeners of new publisher endpoint
 +	celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex);
 +
 +	if (pubsub_discovery->listenerReferences != NULL) {
 +		hash_map_iterator_pt iter = hashMapIterator_create(pubsub_discovery->listenerReferences);
 +		while (hashMapIterator_hasNext(iter)) {
 +			service_reference_pt reference = hashMapIterator_nextKey(iter);
 +
 +			publisher_endpoint_announce_pt listener = NULL;
 +
 +			bundleContext_getService(pubsub_discovery->context, reference, (void**) &listener);
 +            if (epAdded) {
 +                listener->announcePublisher(listener->handle, pubEP);
 +            } else {
 +                listener->removePublisher(listener->handle, pubEP);
 +            }
 +            bundleContext_ungetService(pubsub_discovery->context, reference, NULL);
 +		}
 +		hashMapIterator_destroy(iter);
 +	}
 +
 +	celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
 +
 +	return status;
 +}
 +
 +
 +/* Service's functions implementation */
 +celix_status_t pubsub_discovery_announcePublisher(void *handle, pubsub_endpoint_pt pubEP) {
 +	celix_status_t status = CELIX_SUCCESS;
- 	printf("pubsub_discovery_announcePublisher : %s / %s\n", pubEP->topic, pubEP->endpoint);
++	printf("pubsub_discovery_announcePublisher : %s / %s\n",
++		   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
++		   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +	pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
 +
 +	celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
 +
- 	char *pub_key = createScopeTopicKey(pubEP->scope,pubEP->topic);
++	char *pub_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +	array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key);
 +
 +	if(pubEP_list==NULL){
 +		arrayList_create(&pubEP_list);
 +		hashMap_put(pubsub_discovery->discoveredPubs,strdup(pub_key),pubEP_list);
 +	}
 +	free(pub_key);
 +	pubsub_endpoint_pt p = NULL;
 +	pubsubEndpoint_clone(pubEP, &p);
 +
 +	arrayList_add(pubEP_list,p);
 +
 +	status = etcdWriter_addPublisherEndpoint(pubsub_discovery->writer,p,true);
 +
 +	celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_discovery_removePublisher(void *handle, pubsub_endpoint_pt pubEP) {
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
 +
 +	celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
 +
- 	char *pub_key = createScopeTopicKey(pubEP->scope,pubEP->topic);
++	char *pub_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +	array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key);
 +	free(pub_key);
 +	if(pubEP_list==NULL){
- 		printf("PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n",pubEP->topic);
++		printf("PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n",properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +		status = CELIX_ILLEGAL_STATE;
 +	}
 +	else{
 +
 +		int i;
 +		bool found = false;
 +		pubsub_endpoint_pt p = NULL;
 +
 +		for(i=0;!found && i<arrayList_size(pubEP_list);i++){
 +			p = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
 +			found = pubsubEndpoint_equals(pubEP,p);
 +		}
 +
 +		if(!found){
 +			printf("PSD: Trying to remove a not existing endpoint. Something is not consistent.\n");
 +			status = CELIX_ILLEGAL_STATE;
 +		}
 +		else{
 +
 +			arrayList_removeElement(pubEP_list,p);
 +
 +			status = etcdWriter_deletePublisherEndpoint(pubsub_discovery->writer,p);
 +
 +			pubsubEndpoint_destroy(p);
 +		}
 +	}
 +
 +	celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_discovery_interestedInTopic(void *handle, const char* scope, const char* topic) {
 +    pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
 +
 +    char *scope_topic_key = createScopeTopicKey(scope, topic);
 +    celixThreadMutex_lock(&pubsub_discovery->watchersMutex);
 +    struct watcher_info * wi = hashMap_get(pubsub_discovery->watchers, scope_topic_key);
 +    if(wi) {
 +        wi->nr_references++;
 +        free(scope_topic_key);
 +    } else {
 +        wi = calloc(1, sizeof(*wi));
 +        etcdWatcher_create(pubsub_discovery, pubsub_discovery->context, scope, topic, &wi->watcher);
 +        wi->nr_references = 1;
 +        hashMap_put(pubsub_discovery->watchers, scope_topic_key, wi);
 +    }
 +
 +    celixThreadMutex_unlock(&pubsub_discovery->watchersMutex);
 +
 +    return CELIX_SUCCESS;
 +}
 +
 +celix_status_t pubsub_discovery_uninterestedInTopic(void *handle, const char* scope, const char* topic) {
 +    pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
 +
 +    char *scope_topic_key = createScopeTopicKey(scope, topic);
 +    celixThreadMutex_lock(&pubsub_discovery->watchersMutex);
 +
 +    hash_map_entry_pt entry =  hashMap_getEntry(pubsub_discovery->watchers, scope_topic_key);
 +    if(entry) {
 +        struct watcher_info * wi = hashMapEntry_getValue(entry);
 +        wi->nr_references--;
 +        if(wi->nr_references == 0) {
 +            char *key = hashMapEntry_getKey(entry);
 +            hashMap_remove(pubsub_discovery->watchers, scope_topic_key);
 +            free(key);
 +            free(scope_topic_key);
 +            etcdWatcher_stop(wi->watcher);
 +            etcdWatcher_destroy(wi->watcher);
 +            free(wi);
 +        }
 +    } else {
 +        fprintf(stderr, "[DISC] Inconsistency error: Removing unknown topic %s\n", topic);
 +    }
 +    celixThreadMutex_unlock(&pubsub_discovery->watchersMutex);
 +    return CELIX_SUCCESS;
 +}
 +
 +/* pubsub_topology_manager tracker callbacks */
 +
 +celix_status_t pubsub_discovery_tmPublisherAnnounceAdded(void * handle, service_reference_pt reference, void * service) {
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt)handle;
 +	publisher_endpoint_announce_pt listener = (publisher_endpoint_announce_pt)service;
 +
 +	celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
 +	celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex);
 +
 +	/* Notify the PSTM about discovered publisher endpoints */
 +	hash_map_iterator_pt iter = hashMapIterator_create(pubsub_discovery->discoveredPubs);
 +	while(hashMapIterator_hasNext(iter)){
 +		array_list_pt pubEP_list = (array_list_pt)hashMapIterator_nextValue(iter);
 +		int i;
 +		for(i=0;i<arrayList_size(pubEP_list);i++){
 +			pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
 +			status += listener->announcePublisher(listener->handle, pubEP);
 +		}
 +	}
 +
 +	hashMapIterator_destroy(iter);
 +
 +	hashMap_put(pubsub_discovery->listenerReferences, reference, NULL);
 +
 +	celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
 +	celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
 +
 +	printf("PSD: pubsub_tm_announce_publisher added.\n");
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_discovery_tmPublisherAnnounceModified(void * handle, service_reference_pt reference, void * service) {
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	status = pubsub_discovery_tmPublisherAnnounceRemoved(handle, reference, service);
 +	if (status == CELIX_SUCCESS) {
 +		status = pubsub_discovery_tmPublisherAnnounceAdded(handle, reference, service);
 +	}
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, service_reference_pt reference, void * service) {
 +	celix_status_t status = CELIX_SUCCESS;
 +	pubsub_discovery_pt pubsub_discovery = handle;
 +
 +	celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex);
 +
 +	if (pubsub_discovery->listenerReferences != NULL) {
 +		if (hashMap_remove(pubsub_discovery->listenerReferences, reference)) {
 +			printf("PSD: pubsub_tm_announce_publisher removed.\n");
 +		}
 +	}
 +	celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
 +
 +	return status;
 +}
 +

http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_spi/include/pubsub_endpoint.h
----------------------------------------------------------------------
diff --cc pubsub/pubsub_spi/include/pubsub_endpoint.h
index 4c39d2f,0000000..598d673
mode 100644,000000..100644
--- a/pubsub/pubsub_spi/include/pubsub_endpoint.h
+++ b/pubsub/pubsub_spi/include/pubsub_endpoint.h
@@@ -1,58 -1,0 +1,65 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +/*
 + * pubsub_endpoint.h
 + *
 + *  \date       Sep 21, 2015
 + *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
 + *  \copyright	Apache License, Version 2.0
 + */
 +
 +#ifndef PUBSUB_ENDPOINT_H_
 +#define PUBSUB_ENDPOINT_H_
 +
 +#include "service_reference.h"
 +#include "listener_hook_service.h"
 +#include "properties.h"
 +
 +#include "pubsub/publisher.h"
 +#include "pubsub/subscriber.h"
 +
++#define PUBSUB_ENDPOINT_ID              "pubsub.endpoint.id"
++#define PUBSUB_ENDPOINT_SERVICE_ID      "service.id"
++#define PUBSUB_ENDPOINT_SERIALIZER      "serializer"
++#define PUBSUB_ENDPOINT_ADMIN_TYPE      "pubsub.admin.type"
++#define PUBSUB_ENDPOINT_URL             "pubsub.endpoint"
++#define PUBSUB_ENDPOINT_TOPIC           "pubsub.topic"
++#define PUBSUB_ENDPOINT_SCOPE           "pubsub.scope"
++#define PUBSUB_ENDPOINT_TYPE            "pubsub.type"
++
 +struct pubsub_endpoint {
-     char *frameworkUUID;
-     char *scope;
-     char *topic;
-     long serviceID;
-     char* endpoint;
-     bool is_secure;
++    long serviceID;         //optional
++    bool is_secure;         //optional
++    properties_pt endpoint_props;
 +    properties_pt topic_props;
 +};
 +
 +typedef struct pubsub_endpoint *pubsub_endpoint_pt;
 +
 +celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props,pubsub_endpoint_pt* psEp);
 +celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference,pubsub_endpoint_pt* psEp, bool isPublisher);
 +celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt info,pubsub_endpoint_pt* psEp, bool isPublisher);
 +celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *out);
 +celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp);
 +bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2);
++celix_status_t pubsubEndpoint_setField(pubsub_endpoint_pt ep, const char* key, const char* value);
 +
 +char *createScopeTopicKey(const char* scope, const char* topic);
 +
 +#endif /* PUBSUB_ENDPOINT_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_spi/src/pubsub_endpoint.c
----------------------------------------------------------------------
diff --cc pubsub/pubsub_spi/src/pubsub_endpoint.c
index c3fd293,0000000..d3b746e
mode 100644,000000..100644
--- a/pubsub/pubsub_spi/src/pubsub_endpoint.c
+++ b/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@@ -1,254 -1,0 +1,282 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +/*
 + * endpoint_description.c
 + *
 + *  \date       25 Jul 2014
 + *  \author     <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
 + *  \copyright  Apache License, Version 2.0
 + */
 +
 +#include <string.h>
 +#include <stdlib.h>
++#include <uuid/uuid.h>
 +
 +#include "celix_errno.h"
 +#include "celix_log.h"
 +
 +#include "pubsub_common.h"
 +#include "pubsub_endpoint.h"
 +#include "constants.h"
 +
 +#include "pubsub_utils.h"
 +
 +
 +static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props, bool cloneProps);
 +static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const char *topic, bool isPublisher);
 +
 +static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props, bool cloneProps){
 +
++	if (psEp->endpoint_props == NULL) {
++		psEp->endpoint_props = properties_create();
++	}
++
++	char endpointUuid[37];
++
++	uuid_t endpointUid;
++	uuid_generate(endpointUid);
++	uuid_unparse(endpointUid, endpointUuid);
++	properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_ID, endpointUuid);
++
 +	if (fwUUID != NULL) {
- 		psEp->frameworkUUID = strdup(fwUUID);
++		properties_set(psEp->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID, fwUUID);
 +	}
 +
 +	if (scope != NULL) {
- 		psEp->scope = strdup(scope);
++		properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_SCOPE, scope);
 +	}
 +
 +	if (topic != NULL) {
- 		psEp->topic = strdup(topic);
++		properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC, topic);
 +	}
 +
 +	psEp->serviceID = serviceId;
 +
 +	if(endpoint != NULL) {
- 		psEp->endpoint = strdup(endpoint);
++		properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_URL, endpoint);
 +	}
 +
 +	if(topic_props != NULL){
 +		if(cloneProps){
 +			properties_copy(topic_props, &(psEp->topic_props));
 +		}
 +		else{
 +			psEp->topic_props = topic_props;
 +		}
 +	}
 +}
 +
 +static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const char *topic, bool isPublisher){
 +
 +	properties_pt topic_props = NULL;
 +
 +	bool isSystemBundle = false;
 +	bundle_isSystemBundle(bundle, &isSystemBundle);
 +	long bundleId = -1;
 +	bundle_isSystemBundle(bundle, &isSystemBundle);
 +	bundle_getBundleId(bundle,&bundleId);
 +
 +	if(isSystemBundle == false) {
 +
 +		char *bundleRoot = NULL;
 +		char* topicPropertiesPath = NULL;
 +		bundle_getEntry(bundle, ".", &bundleRoot);
 +
 +		if(bundleRoot != NULL){
 +
 +			asprintf(&topicPropertiesPath, "%s/META-INF/topics/%s/%s.properties", bundleRoot, isPublisher?"pub":"sub", topic);
 +			topic_props = properties_load(topicPropertiesPath);
 +			if(topic_props==NULL){
 +				printf("PSEP: Could not load properties for %s on topic %s, bundleId=%ld\n", isPublisher?"publication":"subscription", topic,bundleId);
 +			}
 +
 +			free(topicPropertiesPath);
 +			free(bundleRoot);
 +		}
 +	}
 +
 +	return topic_props;
 +}
 +
++celix_status_t pubsubEndpoint_setField(pubsub_endpoint_pt ep, const char* key, const char* value) {
++	celix_status_t status = CELIX_SUCCESS;
++
++	if (ep->endpoint_props == NULL) {
++		printf("PUBSUB_EP: No endpoint_props for endpoint available!\n");
++		return CELIX_ILLEGAL_STATE;
++	}
++
++	if (key != NULL && value != NULL) {
++		properties_set(ep->endpoint_props, key, value);
++	}
++
++	return status;
++}
++
 +celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props,pubsub_endpoint_pt* psEp){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	*psEp = calloc(1, sizeof(**psEp));
 +
 +	pubsubEndpoint_setFields(*psEp, fwUUID, scope, topic, serviceId, endpoint, topic_props, true);
 +
 +	return status;
 +
 +}
 +
 +celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *out){
 +	celix_status_t status = CELIX_SUCCESS;
 +
- 	*out = calloc(1,sizeof(**out));
++    pubsub_endpoint_pt ep = calloc(1,sizeof(*ep));
 +
- 	pubsubEndpoint_setFields(*out, in->frameworkUUID, in->scope, in->topic, in->serviceID, in->endpoint, in->topic_props, true);
++	status = properties_copy(in->endpoint_props, &(ep->endpoint_props));
++
++    if (in->topic_props != NULL) {
++        status += properties_copy(in->topic_props, &(ep->topic_props));
++    }
++
++	ep->serviceID = in->serviceID;
++	ep->is_secure = in->is_secure;
++
++    if (status == CELIX_SUCCESS) {
++        *out = ep;
++    } else {
++        pubsubEndpoint_destroy(ep);
++    }
 +
 +	return status;
 +
 +}
 +
 +celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference, pubsub_endpoint_pt* psEp, bool isPublisher){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	pubsub_endpoint_pt ep = calloc(1,sizeof(*ep));
 +
 +	bundle_pt bundle = NULL;
 +	bundle_context_pt ctxt = NULL;
 +	const char* fwUUID = NULL;
 +	serviceReference_getBundle(reference,&bundle);
 +	bundle_getContext(bundle,&ctxt);
 +	bundleContext_getProperty(ctxt,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
 +
 +	const char* scope = NULL;
 +	serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_SCOPE,&scope);
 +
 +	const char* topic = NULL;
 +	serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_TOPIC,&topic);
 +
 +	const char* serviceId = NULL;
 +	serviceReference_getProperty(reference,(char*)OSGI_FRAMEWORK_SERVICE_ID,&serviceId);
 +
 +	/* TODO: is topic_props==NULL a fatal error such that EP cannot be created? */
 +	properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, topic, isPublisher);
 +
 +	pubsubEndpoint_setFields(ep, fwUUID, scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, strtol(serviceId,NULL,10), NULL, topic_props, false);
 +
- 	if (!ep->frameworkUUID || !ep->serviceID || !ep->scope || !ep->topic) {
++	if (!properties_get(ep->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID) ||
++			!ep->serviceID ||
++			!properties_get(ep->endpoint_props, PUBSUB_ENDPOINT_SCOPE) ||
++			!properties_get(ep->endpoint_props, PUBSUB_ENDPOINT_TOPIC)) {
++
 +		fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "PUBSUB_ENDPOINT: incomplete description!.");
 +		status = CELIX_BUNDLE_EXCEPTION;
 +		pubsubEndpoint_destroy(ep);
 +		*psEp = NULL;
 +	}
 +	else{
 +		*psEp = ep;
 +	}
 +
 +	return status;
 +
 +}
 +
 +celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt info,pubsub_endpoint_pt* psEp, bool isPublisher){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	const char* fwUUID=NULL;
 +	bundleContext_getProperty(info->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
 +
 +	if(fwUUID==NULL){
 +		return CELIX_BUNDLE_EXCEPTION;
 +	}
 +
 +	char* topic = pubsub_getTopicFromFilter(info->filter);
 +	if(topic==NULL){
 +		return CELIX_BUNDLE_EXCEPTION;
 +	}
 +
 +	*psEp = calloc(1, sizeof(**psEp));
 +
 +	char* scope = pubsub_getScopeFromFilter(info->filter);
 +	if(scope == NULL) {
 +		scope = strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT);
 +	}
 +
 +	bundle_pt bundle = NULL;
 +	long bundleId = -1;
 +	bundleContext_getBundle(info->context,&bundle);
 +
 +	bundle_getBundleId(bundle,&bundleId);
 +
 +	properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, topic, isPublisher);
 +
 +	/* TODO: is topic_props==NULL a fatal error such that EP cannot be created? */
 +	pubsubEndpoint_setFields(*psEp, fwUUID, scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, bundleId, NULL, topic_props, false);
 +
 +	free(topic);
 +	free(scope);
 +
 +
 +	return status;
 +}
 +
 +celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){
 +
- 	if(psEp->frameworkUUID!=NULL){
- 		free(psEp->frameworkUUID);
- 		psEp->frameworkUUID = NULL;
- 	}
- 
- 	if(psEp->scope!=NULL){
- 		free(psEp->scope);
- 		psEp->scope = NULL;
- 	}
- 
- 	if(psEp->topic!=NULL){
- 		free(psEp->topic);
- 		psEp->topic = NULL;
- 	}
- 
- 	if(psEp->endpoint!=NULL){
- 		free(psEp->endpoint);
- 		psEp->endpoint = NULL;
- 	}
- 
 +	if(psEp->topic_props != NULL){
 +		properties_destroy(psEp->topic_props);
 +	}
 +
++	if (psEp->endpoint_props != NULL) {
++		properties_destroy(psEp->endpoint_props);
++    }
++
 +	free(psEp);
 +
 +	return CELIX_SUCCESS;
 +
 +}
 +
 +bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){
 +
- 	return ((strcmp(psEp1->frameworkUUID,psEp2->frameworkUUID)==0) &&
- 			(strcmp(psEp1->scope,psEp2->scope)==0) &&
- 			(strcmp(psEp1->topic,psEp2->topic)==0) &&
++	return ((strcmp(properties_get(psEp1->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),properties_get(psEp2->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID))==0) &&
++			(strcmp(properties_get(psEp1->endpoint_props, PUBSUB_ENDPOINT_SCOPE),properties_get(psEp2->endpoint_props, PUBSUB_ENDPOINT_SCOPE))==0) &&
++			(strcmp(properties_get(psEp1->endpoint_props, PUBSUB_ENDPOINT_TOPIC),properties_get(psEp2->endpoint_props, PUBSUB_ENDPOINT_TOPIC))==0) &&
 +			(psEp1->serviceID == psEp2->serviceID) /*&&
 +			((psEp1->endpoint==NULL && psEp2->endpoint==NULL)||(strcmp(psEp1->endpoint,psEp2->endpoint)==0))*/
 +	);
 +}
 +
 +char *createScopeTopicKey(const char* scope, const char* topic) {
 +	char *result = NULL;
 +	asprintf(&result, "%s:%s", scope, topic);
 +
 +	return result;
 +}

http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --cc pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 2ac75c9,0000000..a63b275
mode 100644,000000..100644
--- a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@@ -1,721 -1,0 +1,727 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +/*
 + * pubsub_topology_manager.c
 + *
 + *  \date       Sep 29, 2011
 + *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
 + *  \copyright	Apache License, Version 2.0
 + */
 +#include <stdio.h>
 +#include <stdlib.h>
 +#include <string.h>
 +#include <stdbool.h>
 +
 +#include "hash_map.h"
 +#include "array_list.h"
 +#include "bundle_context.h"
 +#include "constants.h"
 +#include "module.h"
 +#include "bundle.h"
 +#include "filter.h"
 +#include "listener_hook_service.h"
 +#include "utils.h"
 +#include "service_reference.h"
 +#include "service_registration.h"
 +#include "log_service.h"
 +#include "log_helper.h"
 +
 +#include "publisher_endpoint_announce.h"
 +#include "pubsub_topology_manager.h"
 +#include "pubsub_endpoint.h"
 +#include "pubsub_admin.h"
 +#include "pubsub_utils.h"
 +
 +
 +celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_pt *manager) {
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	*manager = calloc(1, sizeof(**manager));
 +	if (!*manager) {
 +		return CELIX_ENOMEM;
 +	}
 +
 +	(*manager)->context = context;
 +
 +	celix_thread_mutexattr_t psaAttr;
 +	celixThreadMutexAttr_create(&psaAttr);
 +	celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE);
 +	status = celixThreadMutex_create(&(*manager)->psaListLock, &psaAttr);
 +	celixThreadMutexAttr_destroy(&psaAttr);
 +
 +	status = celixThreadMutex_create(&(*manager)->publicationsLock, NULL);
 +	status = celixThreadMutex_create(&(*manager)->subscriptionsLock, NULL);
 +	status = celixThreadMutex_create(&(*manager)->discoveryListLock, NULL);
 +
 +	arrayList_create(&(*manager)->psaList);
 +
 +	(*manager)->discoveryList = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL);
 +	(*manager)->publications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 +	(*manager)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 +
 +	(*manager)->loghelper = logHelper;
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager) {
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	celixThreadMutex_lock(&manager->discoveryListLock);
 +	hashMap_destroy(manager->discoveryList, false, false);
 +	celixThreadMutex_unlock(&manager->discoveryListLock);
 +	celixThreadMutex_destroy(&manager->discoveryListLock);
 +
 +	celixThreadMutex_lock(&manager->psaListLock);
 +	arrayList_destroy(manager->psaList);
 +	celixThreadMutex_unlock(&manager->psaListLock);
 +	celixThreadMutex_destroy(&manager->psaListLock);
 +
 +	celixThreadMutex_lock(&manager->publicationsLock);
 +	hash_map_iterator_pt pubit = hashMapIterator_create(manager->publications);
 +	while(hashMapIterator_hasNext(pubit)){
 +		array_list_pt l = (array_list_pt)hashMapIterator_nextValue(pubit);
 +		int i;
 +		for(i=0;i<arrayList_size(l);i++){
 +			pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i));
 +		}
 +		arrayList_destroy(l);
 +	}
 +	hashMapIterator_destroy(pubit);
 +	hashMap_destroy(manager->publications, true, false);
 +	celixThreadMutex_unlock(&manager->publicationsLock);
 +	celixThreadMutex_destroy(&manager->publicationsLock);
 +
 +	celixThreadMutex_lock(&manager->subscriptionsLock);
 +	hash_map_iterator_pt subit = hashMapIterator_create(manager->subscriptions);
 +	while(hashMapIterator_hasNext(subit)){
 +		array_list_pt l = (array_list_pt)hashMapIterator_nextValue(subit);
 +		int i;
 +		for(i=0;i<arrayList_size(l);i++){
 +			pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i));
 +		}
 +		arrayList_destroy(l);
 +	}
 +	hashMapIterator_destroy(subit);
 +	hashMap_destroy(manager->subscriptions, true, false);
 +	celixThreadMutex_unlock(&manager->subscriptionsLock);
 +	celixThreadMutex_destroy(&manager->subscriptionsLock);
 +
 +	free(manager);
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_pt reference, void * service) {
 +	celix_status_t status = CELIX_SUCCESS;
 +	pubsub_topology_manager_pt manager = handle;
 +	int i;
 +
 +	pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service;
 +	logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added PSA");
 +
 +	celixThreadMutex_lock(&manager->psaListLock);
 +	arrayList_add(manager->psaList, psa);
 +	celixThreadMutex_unlock(&manager->psaListLock);
 +
 +	// Add already detected subscriptions to new PSA
 +	celixThreadMutex_lock(&manager->subscriptionsLock);
 +	hash_map_iterator_pt subscriptionsIterator = hashMapIterator_create(manager->subscriptions);
 +
 +	while (hashMapIterator_hasNext(subscriptionsIterator)) {
 +		array_list_pt sub_ep_list = hashMapIterator_nextValue(subscriptionsIterator);
 +		for(i=0;i<arrayList_size(sub_ep_list);i++){
 +			status += psa->addSubscription(psa->admin, (pubsub_endpoint_pt)arrayList_get(sub_ep_list,i));
 +		}
 +	}
 +
 +	hashMapIterator_destroy(subscriptionsIterator);
 +
 +	celixThreadMutex_unlock(&manager->subscriptionsLock);
 +
 +	// Add already detected publications to new PSA
 +	status = celixThreadMutex_lock(&manager->publicationsLock);
 +	hash_map_iterator_pt publicationsIterator = hashMapIterator_create(manager->publications);
 +
 +	while (hashMapIterator_hasNext(publicationsIterator)) {
 +		array_list_pt pub_ep_list = hashMapIterator_nextValue(publicationsIterator);
 +		for(i=0;i<arrayList_size(pub_ep_list);i++){
 +			status += psa->addPublication(psa->admin, (pubsub_endpoint_pt)arrayList_get(pub_ep_list,i));
 +		}
 +	}
 +
 +	hashMapIterator_destroy(publicationsIterator);
 +
 +	celixThreadMutex_unlock(&manager->publicationsLock);
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_topologyManager_psaModified(void * handle, service_reference_pt reference, void * service) {
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	// Nop...
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_topologyManager_psaRemoved(void * handle, service_reference_pt reference, void * service) {
 +	celix_status_t status = CELIX_SUCCESS;
 +	pubsub_topology_manager_pt manager = handle;
 +
 +	pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service;
 +
 +	/* Deactivate all publications */
 +	celixThreadMutex_lock(&manager->publicationsLock);
 +
 +	hash_map_iterator_pt pubit = hashMapIterator_create(manager->publications);
 +	while(hashMapIterator_hasNext(pubit)){
 +		hash_map_entry_pt pub_entry = hashMapIterator_nextEntry(pubit);
 +		char* scope_topic_key = (char*)hashMapEntry_getKey(pub_entry);
 +		// Extract scope/topic name from key
 +		char scope[MAX_SCOPE_LEN];
 +		char topic[MAX_TOPIC_LEN];
 +		sscanf(scope_topic_key, "%[^:]:%s", scope, topic );
 +		array_list_pt pubEP_list = (array_list_pt)hashMapEntry_getValue(pub_entry);
 +
 +		status = psa->closeAllPublications(psa->admin,scope,topic);
 +
 +		if(status==CELIX_SUCCESS){
 +			celixThreadMutex_lock(&manager->discoveryListLock);
 +			hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
 +			while(hashMapIterator_hasNext(iter)){
 +				service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
 +				publisher_endpoint_announce_pt disc = NULL;
 +				bundleContext_getService(manager->context, disc_sr, (void**) &disc);
 +				const char* fwUUID = NULL;
 +				bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
 +				int i;
 +				for(i=0;i<arrayList_size(pubEP_list);i++){
 +					pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
- 					if(strcmp(pubEP->frameworkUUID,fwUUID)==0){
++					if(strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0){
 +						disc->removePublisher(disc->handle,pubEP);
 +					}
 +				}
 +				bundleContext_ungetService(manager->context, disc_sr, NULL);
 +			}
 +			hashMapIterator_destroy(iter);
 +			celixThreadMutex_unlock(&manager->discoveryListLock);
 +		}
 +	}
 +	hashMapIterator_destroy(pubit);
 +
 +	celixThreadMutex_unlock(&manager->publicationsLock);
 +
 +	/* Deactivate all subscriptions */
 +	celixThreadMutex_lock(&manager->subscriptionsLock);
 +	hash_map_iterator_pt subit = hashMapIterator_create(manager->subscriptions);
 +	while(hashMapIterator_hasNext(subit)){
 +		// TODO do some error checking
 +		char* scope_topic = (char*)hashMapIterator_nextKey(subit);
 +		char scope[MAX_TOPIC_LEN];
 +		char topic[MAX_TOPIC_LEN];
 +		memset(scope, 0 , MAX_TOPIC_LEN*sizeof(char));
 +		memset(topic, 0 , MAX_TOPIC_LEN*sizeof(char));
 +		sscanf(scope_topic, "%[^:]:%s", scope, topic );
 +		status += psa->closeAllSubscriptions(psa->admin,scope, topic);
 +	}
 +	hashMapIterator_destroy(subit);
 +	celixThreadMutex_unlock(&manager->subscriptionsLock);
 +
 +	celixThreadMutex_lock(&manager->psaListLock);
 +	arrayList_removeElement(manager->psaList, psa);
 +	celixThreadMutex_unlock(&manager->psaListLock);
 +
 +	logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Removed PSA");
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_reference_pt reference, void * service) {
 +	celix_status_t status = CELIX_SUCCESS;
 +	pubsub_topology_manager_pt manager = handle;
 +	//subscriber_service_pt subscriber = (subscriber_service_pt)service;
 +
 +	pubsub_endpoint_pt sub = NULL;
 +	if(pubsubEndpoint_createFromServiceReference(reference,&sub,false) == CELIX_SUCCESS){
 +		celixThreadMutex_lock(&manager->subscriptionsLock);
- 		char *sub_key = createScopeTopicKey(sub->scope, sub->topic);
++		char *sub_key = createScopeTopicKey(properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +
 +		array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key);
 +		if(sub_list_by_topic==NULL){
 +			arrayList_create(&sub_list_by_topic);
 +			hashMap_put(manager->subscriptions,strdup(sub_key),sub_list_by_topic);
 +		}
 +		free(sub_key);
 +		arrayList_add(sub_list_by_topic,sub);
 +
 +		celixThreadMutex_unlock(&manager->subscriptionsLock);
 +
 +		int j;
 +		double score = 0;
 +		double best_score = 0;
 +		pubsub_admin_service_pt best_psa = NULL;
 +		celixThreadMutex_lock(&manager->psaListLock);
 +		for(j=0;j<arrayList_size(manager->psaList);j++){
 +			pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
 +			psa->matchEndpoint(psa->admin,sub,&score);
 +			if(score>best_score){ /* We have a new winner! */
 +				best_score = score;
 +				best_psa = psa;
 +			}
 +		}
 +
 +		if(best_psa != NULL && best_score>0){
 +			best_psa->addSubscription(best_psa->admin,sub);
 +		}
 +
 +		// Inform discoveries for interest in the topic
 +		celixThreadMutex_lock(&manager->discoveryListLock);
 +		hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
 +		while(hashMapIterator_hasNext(iter)){
 +			service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
 +			publisher_endpoint_announce_pt disc = NULL;
 +			bundleContext_getService(manager->context, disc_sr, (void**) &disc);
- 			disc->interestedInTopic(disc->handle, sub->scope, sub->topic);
++			disc->interestedInTopic(disc->handle, properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +			bundleContext_ungetService(manager->context, disc_sr, NULL);
 +		}
 +		hashMapIterator_destroy(iter);
 +		celixThreadMutex_unlock(&manager->discoveryListLock);
 +
 +		celixThreadMutex_unlock(&manager->psaListLock);
 +	}
 +	else{
 +		status=CELIX_INVALID_BUNDLE_CONTEXT;
 +	}
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_topologyManager_subscriberModified(void * handle, service_reference_pt reference, void * service) {
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	// Nop...
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_reference_pt reference, void * service) {
 +	celix_status_t status = CELIX_SUCCESS;
 +	pubsub_topology_manager_pt manager = handle;
 +
 +	pubsub_endpoint_pt subcmp = NULL;
 +	if(pubsubEndpoint_createFromServiceReference(reference,&subcmp,false) == CELIX_SUCCESS){
 +
 +		int j,k;
 +
 +		// Inform discoveries that we not interested in the topic any more
 +		celixThreadMutex_lock(&manager->discoveryListLock);
 +		hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
 +		while(hashMapIterator_hasNext(iter)){
 +			service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
 +			publisher_endpoint_announce_pt disc = NULL;
 +			bundleContext_getService(manager->context, disc_sr, (void**) &disc);
- 			disc->uninterestedInTopic(disc->handle, subcmp->scope, subcmp->topic);
++			disc->uninterestedInTopic(disc->handle, properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +			bundleContext_ungetService(manager->context, disc_sr, NULL);
 +		}
 +		hashMapIterator_destroy(iter);
 +		celixThreadMutex_unlock(&manager->discoveryListLock);
 +
 +		celixThreadMutex_lock(&manager->subscriptionsLock);
 +		celixThreadMutex_lock(&manager->psaListLock);
 +
- 		char *sub_key = createScopeTopicKey(subcmp->scope,subcmp->topic);
++		char *sub_key = createScopeTopicKey(properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_SCOPE),properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +		array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key);
 +		free(sub_key);
 +		if(sub_list_by_topic!=NULL){
 +			for(j=0;j<arrayList_size(sub_list_by_topic);j++){
 +				pubsub_endpoint_pt sub = arrayList_get(sub_list_by_topic,j);
 +				if(pubsubEndpoint_equals(sub,subcmp)){
 +					for(k=0;k<arrayList_size(manager->psaList);k++){
 +						/* No problem with invoking removal on all psa's, only the one that manage this topic will do something */
 +						pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
 +						psa->removeSubscription(psa->admin,sub);
 +					}
 +
 +				}
 +				arrayList_remove(sub_list_by_topic,j);
 +
 +				/* If it was the last subscriber for this topic, tell PSA to close the ZMQ socket */
 +				if(arrayList_size(sub_list_by_topic)==0){
 +					for(k=0;k<arrayList_size(manager->psaList);k++){
 +						pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
- 						psa->closeAllSubscriptions(psa->admin,sub->scope, sub->topic);
++						psa->closeAllSubscriptions(psa->admin, (char*) properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +					}
 +				}
 +
 +				pubsubEndpoint_destroy(sub);
 +
 +			}
 +		}
 +
 +		celixThreadMutex_unlock(&manager->psaListLock);
 +		celixThreadMutex_unlock(&manager->subscriptionsLock);
 +
 +		pubsubEndpoint_destroy(subcmp);
 +
 +	}
 +	else{
 +		status=CELIX_INVALID_BUNDLE_CONTEXT;
 +	}
 +
 +	return status;
 +
 +}
 +
 +celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service_reference_pt reference, void* service) {
 +	celix_status_t status = CELIX_SUCCESS;
 +	pubsub_topology_manager_pt manager = (pubsub_topology_manager_pt)handle;
 +	publisher_endpoint_announce_pt disc = (publisher_endpoint_announce_pt)service;
 +
 +	const char* fwUUID = NULL;
 +
 +	bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
 +	if(fwUUID==NULL){
 +		printf("PSD: ERRROR: Cannot retrieve fwUUID.\n");
 +		return CELIX_INVALID_BUNDLE_CONTEXT;
 +	}
 +
 +	celixThreadMutex_lock(&manager->publicationsLock);
 +
 +	celixThreadMutex_lock(&manager->discoveryListLock);
 +	hashMap_put(manager->discoveryList, reference, NULL);
 +	celixThreadMutex_unlock(&manager->discoveryListLock);
 +
 +	hash_map_iterator_pt iter = hashMapIterator_create(manager->publications);
 +	while(hashMapIterator_hasNext(iter)){
 +		array_list_pt pubEP_list = (array_list_pt)hashMapIterator_nextValue(iter);
 +		for(int i = 0; i < arrayList_size(pubEP_list); i++) {
 +			pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
- 			if( (strcmp(pubEP->frameworkUUID,fwUUID)==0) && (pubEP->endpoint!=NULL)){
++			if( (strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0) && (properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL)){
 +				status += disc->announcePublisher(disc->handle,pubEP);
 +			}
 +		}
 +	}
 +	hashMapIterator_destroy(iter);
 +
 +	celixThreadMutex_unlock(&manager->publicationsLock);
 +
 +	celixThreadMutex_lock(&manager->subscriptionsLock);
 +	iter = hashMapIterator_create(manager->subscriptions);
 +
 +	while(hashMapIterator_hasNext(iter)) {
 +		array_list_pt l = (array_list_pt)hashMapIterator_nextValue(iter);
 +		int i;
 +		for(i=0;i<arrayList_size(l);i++){
 +			pubsub_endpoint_pt subEp = (pubsub_endpoint_pt)arrayList_get(l,i);
 +
- 			disc->interestedInTopic(disc->handle, subEp->scope, subEp->topic);
++			disc->interestedInTopic(disc->handle, properties_get(subEp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +		}
 +	}
 +	hashMapIterator_destroy(iter);
 +	celixThreadMutex_unlock(&manager->subscriptionsLock);
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_topologyManager_pubsubDiscoveryModified(void * handle, service_reference_pt reference, void * service) {
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	status = pubsub_topologyManager_pubsubDiscoveryRemoved(handle, reference, service);
 +	if (status == CELIX_SUCCESS) {
 +		status = pubsub_topologyManager_pubsubDiscoveryAdded(handle, reference, service);
 +	}
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, service_reference_pt reference, void * service) {
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	pubsub_topology_manager_pt manager = handle;
 +
 +	celixThreadMutex_lock(&manager->discoveryListLock);
 +
 +
 +	if (hashMap_remove(manager->discoveryList, reference)) {
 +		logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "EndpointListener Removed");
 +	}
 +
 +	celixThreadMutex_unlock(&manager->discoveryListLock);
 +
 +	return status;
 +}
 +
 +
 +celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_list_pt listeners) {
 +
 +	celix_status_t status = CELIX_SUCCESS;
 +	pubsub_topology_manager_pt manager = handle;
 +
 +	int l_index;
 +
 +	for (l_index = 0; l_index < arrayList_size(listeners); l_index++) {
 +
 +		listener_hook_info_pt info = arrayList_get(listeners, l_index);
 +
 +		pubsub_endpoint_pt pub = NULL;
 +		if(pubsubEndpoint_createFromListenerHookInfo(info, &pub, true) == CELIX_SUCCESS){
 +
 +			celixThreadMutex_lock(&manager->publicationsLock);
- 			char *pub_key = createScopeTopicKey(pub->scope, pub->topic);
++			char *pub_key = createScopeTopicKey(properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +			array_list_pt pub_list_by_topic = hashMap_get(manager->publications, pub_key);
 +			if(pub_list_by_topic==NULL){
 +				arrayList_create(&pub_list_by_topic);
 +				hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic);
 +			}
 +			free(pub_key);
 +			arrayList_add(pub_list_by_topic,pub);
 +
 +			celixThreadMutex_unlock(&manager->publicationsLock);
 +
 +			int j;
 +			double score = 0;
 +			double best_score = 0;
 +			pubsub_admin_service_pt best_psa = NULL;
 +			celixThreadMutex_lock(&manager->psaListLock);
 +
 +			for(j=0;j<arrayList_size(manager->psaList);j++){
 +				pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
 +				psa->matchEndpoint(psa->admin,pub,&score);
 +				if(score>best_score){ /* We have a new winner! */
 +					best_score = score;
 +					best_psa = psa;
 +				}
 +			}
 +
 +			if(best_psa != NULL && best_score>0){
 +				status = best_psa->addPublication(best_psa->admin,pub);
 +				if(status==CELIX_SUCCESS){
 +					celixThreadMutex_lock(&manager->discoveryListLock);
 +					hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
 +					while(hashMapIterator_hasNext(iter)){
 +						service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
 +						publisher_endpoint_announce_pt disc = NULL;
 +						bundleContext_getService(manager->context, disc_sr, (void**) &disc);
 +						disc->announcePublisher(disc->handle,pub);
 +						bundleContext_ungetService(manager->context, disc_sr, NULL);
 +					}
 +					hashMapIterator_destroy(iter);
 +					celixThreadMutex_unlock(&manager->discoveryListLock);
 +				}
 +			}
 +
 +			celixThreadMutex_unlock(&manager->psaListLock);
 +
 +		}
 +
 +	}
 +
 +	return status;
 +
 +}
 +
 +
 +celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, array_list_pt listeners) {
 +	celix_status_t status = CELIX_SUCCESS;
 +	pubsub_topology_manager_pt manager = handle;
 +
 +	int l_index;
 +
 +	for (l_index = 0; l_index < arrayList_size(listeners); l_index++) {
 +
 +		listener_hook_info_pt info = arrayList_get(listeners, l_index);
 +
 +		pubsub_endpoint_pt pubcmp = NULL;
 +		if(pubsubEndpoint_createFromListenerHookInfo(info,&pubcmp,true) == CELIX_SUCCESS){
 +
 +
 +			int j,k;
 +			celixThreadMutex_lock(&manager->psaListLock);
 +			celixThreadMutex_lock(&manager->publicationsLock);
 +
- 			char *pub_key = createScopeTopicKey(pubcmp->scope, pubcmp->topic);
++			char *pub_key = createScopeTopicKey(properties_get(pubcmp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +			array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
 +			if(pub_list_by_topic!=NULL){
 +				for(j=0;j<arrayList_size(pub_list_by_topic);j++){
 +					pubsub_endpoint_pt pub = arrayList_get(pub_list_by_topic,j);
 +					if(pubsubEndpoint_equals(pub,pubcmp)){
 +						for(k=0;k<arrayList_size(manager->psaList);k++){
 +							pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
 +							status = psa->removePublication(psa->admin,pub);
 +							if(status==CELIX_SUCCESS){ /* We found the one that manages this endpoint */
 +								celixThreadMutex_lock(&manager->discoveryListLock);
 +								hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
 +								while(hashMapIterator_hasNext(iter)){
 +									service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
 +									publisher_endpoint_announce_pt disc = NULL;
 +									bundleContext_getService(manager->context, disc_sr, (void**) &disc);
 +									disc->removePublisher(disc->handle,pub);
 +									bundleContext_ungetService(manager->context, disc_sr, NULL);
 +								}
 +								hashMapIterator_destroy(iter);
 +								celixThreadMutex_unlock(&manager->discoveryListLock);
 +							}
 +							else if(status ==  CELIX_ILLEGAL_ARGUMENT){ /* Not a real error, just saying this psa does not handle this endpoint */
 +								status = CELIX_SUCCESS;
 +							}
 +						}
 +						//}
 +						arrayList_remove(pub_list_by_topic,j);
 +
 +						/* If it was the last publisher for this topic, tell PSA to close the ZMQ socket and then inform the discovery */
 +						if(arrayList_size(pub_list_by_topic)==0){
 +							for(k=0;k<arrayList_size(manager->psaList);k++){
 +								pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
- 								psa->closeAllPublications(psa->admin,pub->scope, pub->topic);
++								psa->closeAllPublications(psa->admin, (char*) properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +							}
 +						}
 +
 +						pubsubEndpoint_destroy(pub);
 +					}
 +
 +				}
 +			}
 +
 +			celixThreadMutex_unlock(&manager->publicationsLock);
 +			celixThreadMutex_unlock(&manager->psaListLock);
 +
 +			free(pub_key);
 +
 +			pubsubEndpoint_destroy(pubcmp);
 +
 +		}
 +
 +	}
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_endpoint_pt pubEP){
 +	celix_status_t status = CELIX_SUCCESS;
- 	printf("PSTM: New publisher discovered for topic %s [fwUUID=%s, ep=%s]\n",pubEP->topic,pubEP->frameworkUUID,pubEP->endpoint);
++	printf("PSTM: New publisher discovered for topic %s [fwUUID=%s, ep=%s]\n",
++		   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
++		   properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
++		   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +
 +	pubsub_topology_manager_pt manager = handle;
 +	celixThreadMutex_lock(&manager->psaListLock);
 +	celixThreadMutex_lock(&manager->publicationsLock);
 +
- 	char *pub_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
++	char *pub_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +
 +	array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
 +	if(pub_list_by_topic==NULL){
 +		arrayList_create(&pub_list_by_topic);
 +		hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic);
 +	}
 +	free(pub_key);
 +
 +	/* Shouldn't be any other duplicate, since it's filtered out by the discovery */
 +	pubsub_endpoint_pt p = NULL;
 +	pubsubEndpoint_clone(pubEP, &p);
 +	arrayList_add(pub_list_by_topic,p);
 +
 +	int j;
 +	double score = 0;
 +	double best_score = 0;
 +	pubsub_admin_service_pt best_psa = NULL;
 +
 +	for(j=0;j<arrayList_size(manager->psaList);j++){
 +		pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
 +		psa->matchEndpoint(psa->admin,p,&score);
 +		if(score>best_score){ /* We have a new winner! */
 +			best_score = score;
 +			best_psa = psa;
 +		}
 +	}
 +
 +	if(best_psa != NULL && best_score>0){
 +		best_psa->addPublication(best_psa->admin,p);
 +	}
 +	else{
 +		status = CELIX_ILLEGAL_STATE;
 +	}
 +
 +	celixThreadMutex_unlock(&manager->publicationsLock);
 +	celixThreadMutex_unlock(&manager->psaListLock);
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpoint_pt pubEP){
 +	celix_status_t status = CELIX_SUCCESS;
- 	printf("PSTM: Publisher removed for topic %s [fwUUID=%s, ep=%s]\n",pubEP->topic,pubEP->frameworkUUID,pubEP->endpoint);
++	printf("PSTM: Publisher removed for topic %s [fwUUID=%s, ep=%s]\n",
++		   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
++		   properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),
++		   properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +
 +	pubsub_topology_manager_pt manager = handle;
 +	celixThreadMutex_lock(&manager->psaListLock);
 +	celixThreadMutex_lock(&manager->publicationsLock);
 +	int i;
 +
- 	char *pub_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
++	char *pub_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +	array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
 +	if(pub_list_by_topic==NULL){
- 		printf("PSTM: ERROR: Cannot find topic for known endpoint [%s,%s,%s]. Something is inconsistent.\n",pub_key,pubEP->frameworkUUID,pubEP->endpoint);
++		printf("PSTM: ERROR: Cannot find topic for known endpoint [%s,%s,%s]. Something is inconsistent.\n",pub_key,properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +		status = CELIX_ILLEGAL_STATE;
 +	}
 +	else{
 +
 +		pubsub_endpoint_pt p = NULL;
 +		bool found = false;
 +
 +		for(i=0;!found && i<arrayList_size(pub_list_by_topic);i++){
 +			p = (pubsub_endpoint_pt)arrayList_get(pub_list_by_topic,i);
 +			found = pubsubEndpoint_equals(p,pubEP);
 +		}
 +
 +		if(found && p !=NULL){
 +
 +			for(i=0;i<arrayList_size(manager->psaList);i++){
 +				pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
 +				/* No problem with invoking removal on all psa's, only the one that manage this topic will do something */
 +				psa->removePublication(psa->admin,p);
 +			}
 +
 +			arrayList_removeElement(pub_list_by_topic,p);
 +
 +			/* If it was the last publisher for this topic, tell PSA to close the ZMQ socket */
 +			if(arrayList_size(pub_list_by_topic)==0){
 +
 +				for(i=0;i<arrayList_size(manager->psaList);i++){
 +					pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
- 					psa->closeAllPublications(psa->admin,p->scope, p->topic);
++					psa->closeAllPublications(psa->admin, (char*) properties_get(p->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) properties_get(p->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +				}
 +			}
 +
 +			pubsubEndpoint_destroy(p);
 +		}
 +
 +
 +	}
 +	free(pub_key);
 +	celixThreadMutex_unlock(&manager->publicationsLock);
 +	celixThreadMutex_unlock(&manager->psaListLock);
 +
 +
 +	return status;
 +}
 +

http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/utils/include/properties.h
----------------------------------------------------------------------
diff --cc utils/include/properties.h
index 5c6dc4d,0000000..582a242
mode 100644,000000..100644
--- a/utils/include/properties.h
+++ b/utils/include/properties.h
@@@ -1,68 -1,0 +1,70 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +/*
 + * properties.h
 + *
 + *  \date       Apr 27, 2010
 + *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
 + *  \copyright	Apache License, Version 2.0
 + */
 +
 +#ifndef PROPERTIES_H_
 +#define PROPERTIES_H_
 +
 +#include <stdio.h>
 +
 +#include "hash_map.h"
 +#include "exports.h"
 +#include "celix_errno.h"
 +#ifdef __cplusplus
 +extern "C" {
 +#endif
 +typedef hash_map_pt properties_pt;
 +typedef hash_map_t properties_t;
 +
 +UTILS_EXPORT properties_pt properties_create(void);
 +
 +UTILS_EXPORT void properties_destroy(properties_pt properties);
 +
 +UTILS_EXPORT properties_pt properties_load(const char *filename);
 +
 +UTILS_EXPORT properties_pt properties_loadWithStream(FILE *stream);
 +
 +UTILS_EXPORT properties_pt properties_loadFromString(const char *input);
 +
 +UTILS_EXPORT void properties_store(properties_pt properties, const char *file, const char *header);
 +
 +UTILS_EXPORT const char *properties_get(properties_pt properties, const char *key);
 +
 +UTILS_EXPORT const char *properties_getWithDefault(properties_pt properties, const char *key, const char *defaultValue);
 +
 +UTILS_EXPORT void properties_set(properties_pt properties, const char *key, const char *value);
 +
++UTILS_EXPORT void properties_unset(properties_pt properties, const char *key);
++
 +UTILS_EXPORT celix_status_t properties_copy(properties_pt properties, properties_pt *copy);
 +
 +#define PROPERTIES_FOR_EACH(props, key) \
 +    for(hash_map_iterator_t iter = hashMapIterator_construct(props); \
 +        hashMapIterator_hasNext(&iter), (key) = (const char*)hashMapIterator_nextKey(&iter);)
 +#ifdef __cplusplus
 +}
 +#endif
 +
 +#endif /* PROPERTIES_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/utils/src/properties.c
----------------------------------------------------------------------
diff --cc utils/src/properties.c
index 1e097a0,0000000..860b9bb
mode 100644,000000..100644
--- a/utils/src/properties.c
+++ b/utils/src/properties.c
@@@ -1,330 -1,0 +1,335 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +/*
 + * properties.c
 + *
 + *  \date       Apr 27, 2010
 + *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
 + *  \copyright	Apache License, Version 2.0
 + */
 +#include <stdio.h>
 +#include <string.h>
 +#include <stdlib.h>
 +#include <ctype.h>
 +#include "celixbool.h"
 +#include "properties.h"
 +#include "utils.h"
 +
 +#define MALLOC_BLOCK_SIZE		5
 +
 +static void parseLine(const char* line, properties_pt props);
 +
 +properties_pt properties_create(void) {
 +	return hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals, utils_stringEquals);
 +}
 +
 +void properties_destroy(properties_pt properties) {
 +	hash_map_iterator_pt iter = hashMapIterator_create(properties);
 +	while (hashMapIterator_hasNext(iter)) {
 +		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
 +		free(hashMapEntry_getKey(entry));
 +		free(hashMapEntry_getValue(entry));
 +	}
 +	hashMapIterator_destroy(iter);
 +	hashMap_destroy(properties, false, false);
 +}
 +
 +properties_pt properties_load(const char* filename) {
 +	FILE *file = fopen(filename, "r");
 +	if(file==NULL){
 +		return NULL;
 +	}
 +	properties_pt props = properties_loadWithStream(file);
 +	fclose(file);
 +	return props;
 +}
 +
 +properties_pt properties_loadWithStream(FILE *file) {
 +	properties_pt props = NULL;
 +
 +
 +	if (file != NULL ) {
 +		char *saveptr;
 +		char *filebuffer = NULL;
 +		char *line = NULL;
 +		size_t file_size = 0;
 +
 +		props = properties_create();
 +		fseek(file, 0, SEEK_END);
 +		file_size = ftell(file);
 +		fseek(file, 0, SEEK_SET);
 +
 +		if(file_size > 0){
 +			filebuffer = calloc(file_size + 1, sizeof(char));
 +			if(filebuffer) {
 +				size_t rs = fread(filebuffer, sizeof(char), file_size, file);
 +				if(rs != file_size){
 +					fprintf(stderr,"fread read only %lu bytes out of %lu\n",rs,file_size);
 +				}
 +				filebuffer[file_size]='\0';
 +				line = strtok_r(filebuffer, "\n", &saveptr);
 +				while ( line != NULL ) {
 +					parseLine(line, props);
 +					line = strtok_r(NULL, "\n", &saveptr);
 +				}
 +				free(filebuffer);
 +			}
 +		}
 +	}
 +
 +	return props;
 +}
 +
 +properties_pt properties_loadFromString(const char *input){
 +	properties_pt props = properties_create();
 +
 +	char *in = strdup(input);
 +	char *line = NULL;
 +	char *saveLinePointer = NULL;
 +
 +	bool firstTime = true;
 +	do {
 +		if (firstTime){
 +			line = strtok_r(in, "\n", &saveLinePointer);
 +			firstTime = false;
 +		}else {
 +			line = strtok_r(NULL, "\n", &saveLinePointer);
 +		}
 +
 +		if (line == NULL){
 +			break;
 +		}
 +
 +		parseLine(line, props);
 +	} while(line != NULL);
 +
 +	free(in);
 +
 +	return props;
 +}
 +
 +
 +/**
 + * Header is ignored for now, cannot handle comments yet
 + */
 +void properties_store(properties_pt properties, const char* filename, const char* header) {
 +	FILE *file = fopen ( filename, "w+" );
 +	char *str;
 +
 +	if (file != NULL) {
 +		if (hashMap_size(properties) > 0) {
 +			hash_map_iterator_pt iterator = hashMapIterator_create(properties);
 +			while (hashMapIterator_hasNext(iterator)) {
 +				hash_map_entry_pt entry = hashMapIterator_nextEntry(iterator);
 +				str = hashMapEntry_getKey(entry);
 +				for (int i = 0; i < strlen(str); i += 1) {
 +					if (str[i] == '#' || str[i] == '!' || str[i] == '=' || str[i] == ':') {
 +						fputc('\\', file);
 +					}
 +					fputc(str[i], file);
 +				}
 +
 +				fputc('=', file);
 +
 +				str = hashMapEntry_getValue(entry);
 +				for (int i = 0; i < strlen(str); i += 1) {
 +					if (str[i] == '#' || str[i] == '!' || str[i] == '=' || str[i] == ':') {
 +						fputc('\\', file);
 +					}
 +					fputc(str[i], file);
 +				}
 +
 +				fputc('\n', file);
 +
 +			}
 +			hashMapIterator_destroy(iterator);
 +		}
 +		fclose(file);
 +	} else {
 +		perror("File is null");
 +	}
 +}
 +
 +celix_status_t properties_copy(properties_pt properties, properties_pt *out) {
 +	celix_status_t status = CELIX_SUCCESS;
 +	properties_pt copy = properties_create();
 +
 +	if (copy != NULL) {
 +		hash_map_iterator_pt iter = hashMapIterator_create(properties);
 +		while (hashMapIterator_hasNext(iter)) {
 +			hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
 +			char *key = hashMapEntry_getKey(entry);
 +			char *value = hashMapEntry_getValue(entry);
 +			properties_set(copy, key, value);
 +		}
 +		hashMapIterator_destroy(iter);
 +	} else {
 +		status = CELIX_ENOMEM;
 +	}
 +
 +	if (status == CELIX_SUCCESS) {
 +		*out = copy;
 +	}
 +
 +	return status;
 +}
 +
 +const char* properties_get(properties_pt properties, const char* key) {
 +	return hashMap_get(properties, (void*)key);
 +}
 +
 +const char* properties_getWithDefault(properties_pt properties, const char* key, const char* defaultValue) {
 +	const char* value = properties_get(properties, key);
 +	return value == NULL ? defaultValue : value;
 +}
 +
 +void properties_set(properties_pt properties, const char* key, const char* value) {
 +	hash_map_entry_pt entry = hashMap_getEntry(properties, key);
 +	char* oldValue = NULL;
 +	if (entry != NULL) {
 +		char* oldKey = hashMapEntry_getKey(entry);
 +		oldValue = hashMapEntry_getValue(entry);
 +		hashMap_put(properties, oldKey, strndup(value, 1024*10));
 +	} else {
 +		hashMap_put(properties, strndup(key, 1024*10), strndup(value, 1024*10));
 +	}
 +	free(oldValue);
 +}
 +
++void properties_unset(properties_pt properties, const char* key) {
++	char* oldValue = hashMap_remove(properties, key);
++	free(oldValue);
++}
++
 +static void updateBuffers(char **key, char ** value, char **output, int outputPos, int *key_len, int *value_len) {
 +	if (*output == *key) {
 +		if (outputPos == (*key_len) - 1) {
 +			(*key_len) += MALLOC_BLOCK_SIZE;
 +			*key = realloc(*key, *key_len);
 +			*output = *key;
 +		}
 +	}
 +	else {
 +		if (outputPos == (*value_len) - 1) {
 +			(*value_len) += MALLOC_BLOCK_SIZE;
 +			*value = realloc(*value, *value_len);
 +			*output = *value;
 +		}
 +	}
 +}
 +
 +static void parseLine(const char* line, properties_pt props) {
 +	int linePos = 0;
 +	bool precedingCharIsBackslash = false;
 +	bool isComment = false;
 +	int outputPos = 0;
 +	char *output = NULL;
 +	int key_len = MALLOC_BLOCK_SIZE;
 +	int value_len = MALLOC_BLOCK_SIZE;
 +	linePos = 0;
 +	precedingCharIsBackslash = false;
 +	isComment = false;
 +	output = NULL;
 +	outputPos = 0;
 +
 +	//Ignore empty lines
 +	if (line[0] == '\n' && line[1] == '\0') {
 +		return;
 +	}
 +
 +	char *key = calloc(1, key_len);
 +	char *value = calloc(1, value_len);
 +	key[0] = '\0';
 +	value[0] = '\0';
 +
 +	while (line[linePos] != '\0') {
 +		if (line[linePos] == ' ' || line[linePos] == '\t') {
 +			if (output == NULL) {
 +				//ignore
 +				linePos += 1;
 +				continue;
 +			}
 +		}
 +		else {
 +			if (output == NULL) {
 +				output = key;
 +			}
 +		}
 +		if (line[linePos] == '=' || line[linePos] == ':' || line[linePos] == '#' || line[linePos] == '!') {
 +			if (precedingCharIsBackslash) {
 +				//escaped special character
 +				output[outputPos++] = line[linePos];
 +				updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len);
 +				precedingCharIsBackslash = false;
 +			}
 +			else {
 +				if (line[linePos] == '#' || line[linePos] == '!') {
 +					if (outputPos == 0) {
 +						isComment = true;
 +						break;
 +					}
 +					else {
 +						output[outputPos++] = line[linePos];
 +						updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len);
 +					}
 +				}
 +				else { // = or :
 +					if (output == value) { //already have a seperator
 +						output[outputPos++] = line[linePos];
 +						updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len);
 +					}
 +					else {
 +						output[outputPos++] = '\0';
 +						updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len);
 +						output = value;
 +						outputPos = 0;
 +					}
 +				}
 +			}
 +		}
 +		else if (line[linePos] == '\\') {
 +			if (precedingCharIsBackslash) { //double backslash -> backslash
 +				output[outputPos++] = '\\';
 +				updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len);
 +			}
 +			precedingCharIsBackslash = true;
 +		}
 +		else { //normal character
 +			precedingCharIsBackslash = false;
 +			output[outputPos++] = line[linePos];
 +			updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len);
 +		}
 +		linePos += 1;
 +	}
 +	if (output != NULL) {
 +		output[outputPos] = '\0';
 +	}
 +
 +	if (!isComment) {
 +		//printf("putting 'key'/'value' '%s'/'%s' in properties\n", utils_stringTrim(key), utils_stringTrim(value));
 +		properties_set(props, utils_stringTrim(key), utils_stringTrim(value));
 +	}
 +	if(key) {
 +		free(key);
 +	}
 +	if(value) {
 +		free(value);
 +	}
 +
 +}


Mime
View raw message