celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnol...@apache.org
Subject [4/8] celix git commit: CELIX-417: Refactors cmake usage of pubsub and rsa. Started with installing exported targets
Date Wed, 24 Jan 2018 18:30:34 GMT
http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_discovery/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_discovery/CMakeLists.txt b/pubsub/pubsub_discovery/CMakeLists.txt
index 0e7d6c5..f92f81c 100644
--- a/pubsub/pubsub_discovery/CMakeLists.txt
+++ b/pubsub/pubsub_discovery/CMakeLists.txt
@@ -18,7 +18,7 @@
 find_package(CURL REQUIRED)
 find_package(Jansson REQUIRED)
 
-add_bundle(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+add_bundle(celix_pubsub_discovery_etcd
     BUNDLE_SYMBOLICNAME "apache_celix_pubsub_discovery_etcd"
     VERSION "1.0.0"
     SOURCES
@@ -27,16 +27,15 @@ add_bundle(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
         src/etcd_common.c
         src/etcd_watcher.c
         src/etcd_writer.c
-		${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
-		${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
 )
 
-target_include_directories(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery PRIVATE
+target_include_directories(celix_pubsub_discovery_etcd PRIVATE
     src
-    include
 	${CURL_INCLUDE_DIR}
 	${JANSSON_INCLUDE_DIR}
-        )
+)
+
+target_link_libraries(celix_pubsub_discovery_etcd PRIVATE Celix::pubsub_spi Celix::framework Celix::etcdlib_static ${CURL_LIBRARIES} ${JANSSON_LIBRARIES})
+install_bundle(celix_pubsub_discovery_etcd)
 
-target_link_libraries(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery PRIVATE Celix::framework Celix::etcdlib_static ${CURL_LIBRARIES} ${JANSSON_LIBRARIES})
-install_bundle(org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery)
+add_library(Celix::pubsub_discovery_etcd ALIAS celix_pubsub_discovery_etcd)

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_serializer_json/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/CMakeLists.txt b/pubsub/pubsub_serializer_json/CMakeLists.txt
index b86f30e..1269cad 100644
--- a/pubsub/pubsub_serializer_json/CMakeLists.txt
+++ b/pubsub/pubsub_serializer_json/CMakeLists.txt
@@ -18,22 +18,23 @@
 find_package(Jansson REQUIRED)
 
 
-add_bundle(org.apache.celix.pubsub_serializer.PubSubSerializerJson
+add_bundle(celix_pubsub_serializer_json
     BUNDLE_SYMBOLICNAME "apache_celix_pubsub_serializer_json"
     VERSION "1.0.0"
     SOURCES
 		src/ps_activator.c
 		src/pubsub_serializer_impl.c
-    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
 )
 
-target_include_directories(org.apache.celix.pubsub_serializer.PubSubSerializerJson PRIVATE
+target_include_directories(celix_pubsub_serializer_json PRIVATE
 	src
 	${JANSSON_INCLUDE_DIR}
 )
 
-set_target_properties(org.apache.celix.pubsub_serializer.PubSubSerializerJson PROPERTIES INSTALL_RPATH "$ORIGIN")
-target_link_libraries(org.apache.celix.pubsub_serializer.PubSubSerializerJson PRIVATE Celix::framework Celix::dfi ${JANSSON_LIBRARIES} Celix::log_helper)
+set_target_properties(celix_pubsub_serializer_json PROPERTIES INSTALL_RPATH "$ORIGIN")
+target_link_libraries(celix_pubsub_serializer_json PRIVATE Celix::pubsub_spi Celix::framework Celix::dfi ${JANSSON_LIBRARIES} Celix::log_helper)
 
-install_bundle(org.apache.celix.pubsub_serializer.PubSubSerializerJson)
+install_bundle(celix_pubsub_serializer_json)
+
+add_library(Celix::pubsub_serializer_json ALIAS celix_pubsub_serializer_json)
 

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/CMakeLists.txt b/pubsub/pubsub_spi/CMakeLists.txt
new file mode 100644
index 0000000..118dd3a
--- /dev/null
+++ b/pubsub/pubsub_spi/CMakeLists.txt
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_library(celix_pubsub_spi STATIC
+		src/pubsub_admin_match.c
+        src/pubsub_endpoint.c
+        src/pubsub_utils.c
+)
+target_include_directories(celix_pubsub_spi PUBLIC
+		$<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}/include>
+		$<INSTALL_INTERFACE:include/celix/pubsub_spi>
+)
+target_link_libraries(celix_pubsub_spi PUBLIC Celix::framework Celix::pubsub_api)
+
+set_target_properties(celix_pubsub_spi PROPERTIES TOPIC_INFO_DESCRIPTOR ${CMAKE_CURRENT_LIST_DIR}/include/pubsub_topic_info.descriptor)
+#TODO how to make this descriptor available for imported targets? $<INSTALL_INTERFACE:include/celix/pubsub_spi/pubsub_topic_info.descriptor>
+
+add_library(Celix::pubsub_spi ALIAS celix_pubsub_spi)
+
+install(TARGETS celix_pubsub_spi EXPORT celix DESTINATION ${CMAKE_INSTALL_LIBDIR} COMPONENT pubsub)
+install(DIRECTORY include/ DESTINATION include/celix/pubsub_spi COMPONENT pubsub)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/include/publisher_endpoint_announce.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/publisher_endpoint_announce.h b/pubsub/pubsub_spi/include/publisher_endpoint_announce.h
new file mode 100644
index 0000000..bd39fc0
--- /dev/null
+++ b/pubsub/pubsub_spi/include/publisher_endpoint_announce.h
@@ -0,0 +1,36 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+#ifndef PUBLISHER_ENDPOINT_ANNOUNCE_H_
+#define PUBLISHER_ENDPOINT_ANNOUNCE_H_
+
+#include "pubsub_endpoint.h"
+
+struct publisher_endpoint_announce {
+	void *handle;
+	celix_status_t (*announcePublisher)(void *handle, pubsub_endpoint_pt pubEP);
+	celix_status_t (*removePublisher)(void *handle, pubsub_endpoint_pt pubEP);
+	celix_status_t (*interestedInTopic)(void* handle, const char *scope, const char *topic);
+	celix_status_t (*uninterestedInTopic)(void* handle, const char *scope, const char *topic);
+};
+
+typedef struct publisher_endpoint_announce *publisher_endpoint_announce_pt;
+
+
+#endif /* PUBLISHER_ENDPOINT_ANNOUNCE_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/include/pubsub_admin.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_admin.h b/pubsub/pubsub_spi/include/pubsub_admin.h
new file mode 100644
index 0000000..f24d825
--- /dev/null
+++ b/pubsub/pubsub_spi/include/pubsub_admin.h
@@ -0,0 +1,72 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_admin.h
+ *
+ *  \date       Sep 30, 2011
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_ADMIN_H_
+#define PUBSUB_ADMIN_H_
+
+#include "service_reference.h"
+
+#include "pubsub_common.h"
+#include "pubsub_endpoint.h"
+
+#define PSA_IP 	"PSA_IP"
+#define PSA_ITF	"PSA_INTERFACE"
+#define PSA_MULTICAST_IP_PREFIX "PSA_MC_PREFIX"
+
+#define PUBSUB_ADMIN_TYPE_KEY	"pubsub_admin.type"
+
+typedef struct pubsub_admin *pubsub_admin_pt;
+
+struct pubsub_admin_service {
+	pubsub_admin_pt admin;
+
+	celix_status_t (*addSubscription)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+	celix_status_t (*removeSubscription)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+
+	celix_status_t (*addPublication)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+	celix_status_t (*removePublication)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
+
+	celix_status_t (*closeAllPublications)(pubsub_admin_pt admin,char* scope, char* topic);
+	celix_status_t (*closeAllSubscriptions)(pubsub_admin_pt admin,char* scope, char* topic);
+
+	/* Match principle:
+	 * - A full matching pubsub_admin gives 200 points
+	 * - A full matching serializer gives 100 points
+	 * - If QoS = sample
+	 * 		- fallback pubsub_admin order of selection is: udp_mc, zmq. Points allocation is 100,75.
+	 * 		- fallback serializers order of selection is: json, void. Points allocation is 30,20.
+	 * - If QoS = control
+	 * 		- fallback pubsub_admin order of selection is: zmq,udp_mc. Points allocation is 100,75.
+	 * 		- fallback serializers order of selection is: json, void. Points allocation is 30,20.
+	 * - If nothing is specified, QoS = sample is assumed, so the same score applies, just divided by two.
+	 *
+	 */
+	celix_status_t (*matchEndpoint)(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score);
+};
+
+typedef struct pubsub_admin_service *pubsub_admin_service_pt;
+
+#endif /* PUBSUB_ADMIN_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/include/pubsub_admin_match.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_admin_match.h b/pubsub/pubsub_spi/include/pubsub_admin_match.h
new file mode 100644
index 0000000..e95ca7d
--- /dev/null
+++ b/pubsub/pubsub_spi/include/pubsub_admin_match.h
@@ -0,0 +1,40 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#ifndef PUBSUB_ADMIN_MATCH_H_
+#define PUBSUB_ADMIN_MATCH_H_
+
+#include "celix_errno.h"
+#include "properties.h"
+#include "array_list.h"
+
+#include "pubsub_serializer.h"
+
+#define QOS_ATTRIBUTE_KEY	"attribute.qos"
+#define QOS_TYPE_SAMPLE		"sample"	/* A.k.a. unreliable connection */
+#define QOS_TYPE_CONTROL	"control"	/* A.k.a. reliable connection */
+
+#define PUBSUB_ADMIN_FULL_MATCH_SCORE	200.0F
+#define SERIALIZER_FULL_MATCH_SCORE		100.0F
+
+celix_status_t pubsub_admin_match(properties_pt endpoint_props, const char *pubsub_admin_type, array_list_pt serializerList, double *score);
+celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, array_list_pt serializerList, pubsub_serializer_service_t **serSvc);
+
+#endif /* PUBSUB_ADMIN_MATCH_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/include/pubsub_common.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_common.h b/pubsub/pubsub_spi/include/pubsub_common.h
new file mode 100644
index 0000000..5dfd8fd
--- /dev/null
+++ b/pubsub/pubsub_spi/include/pubsub_common.h
@@ -0,0 +1,52 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_common.h
+ *
+ *  \date       Sep 17, 2015
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_COMMON_H_
+#define PUBSUB_COMMON_H_
+
+#define PUBSUB_SERIALIZER_SERVICE 		"pubsub_serializer"
+#define PUBSUB_ADMIN_SERVICE 			"pubsub_admin"
+#define PUBSUB_DISCOVERY_SERVICE		"pubsub_discovery"
+#define PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE    "pubsub_tm_announce_publisher"
+
+#define PUBSUB_ANY_SUB_TOPIC		        "any"
+
+#define	PUBSUB_BUNDLE_ID			"bundle.id"
+
+#define MAX_SCOPE_LEN                           1024
+#define MAX_TOPIC_LEN				1024
+
+struct pubsub_msg_header{
+	char topic[MAX_TOPIC_LEN];
+	unsigned int type;
+	unsigned char major;
+	unsigned char minor;
+};
+
+typedef struct pubsub_msg_header* pubsub_msg_header_pt;
+
+
+#endif /* PUBSUB_COMMON_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/include/pubsub_endpoint.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_endpoint.h b/pubsub/pubsub_spi/include/pubsub_endpoint.h
new file mode 100644
index 0000000..4c39d2f
--- /dev/null
+++ b/pubsub/pubsub_spi/include/pubsub_endpoint.h
@@ -0,0 +1,58 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_endpoint.h
+ *
+ *  \date       Sep 21, 2015
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_ENDPOINT_H_
+#define PUBSUB_ENDPOINT_H_
+
+#include "service_reference.h"
+#include "listener_hook_service.h"
+#include "properties.h"
+
+#include "pubsub/publisher.h"
+#include "pubsub/subscriber.h"
+
+struct pubsub_endpoint {
+    char *frameworkUUID;
+    char *scope;
+    char *topic;
+    long serviceID;
+    char* endpoint;
+    bool is_secure;
+    properties_pt topic_props;
+};
+
+typedef struct pubsub_endpoint *pubsub_endpoint_pt;
+
+celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props,pubsub_endpoint_pt* psEp);
+celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference,pubsub_endpoint_pt* psEp, bool isPublisher);
+celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt info,pubsub_endpoint_pt* psEp, bool isPublisher);
+celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *out);
+celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp);
+bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2);
+
+char *createScopeTopicKey(const char* scope, const char* topic);
+
+#endif /* PUBSUB_ENDPOINT_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/include/pubsub_serializer.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_serializer.h b/pubsub/pubsub_spi/include/pubsub_serializer.h
new file mode 100644
index 0000000..4489fa4
--- /dev/null
+++ b/pubsub/pubsub_spi/include/pubsub_serializer.h
@@ -0,0 +1,66 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_serializer.h
+ *
+ *  \date       Mar 24, 2017
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_SERIALIZER_SERVICE_H_
+#define PUBSUB_SERIALIZER_SERVICE_H_
+
+#include "service_reference.h"
+#include "hash_map.h"
+
+#include "pubsub_common.h"
+
+#define PUBSUB_SERIALIZER_TYPE_KEY	"pubsub_serializer.type"
+
+/**
+ * There should be a pubsub_serializer_t
+ * per msg type (msg id) per bundle
+ *
+ * The pubsub_serializer_service can create
+ * a serializer_map per bundle. Potentially using
+ * the extender pattern.
+ */
+
+typedef struct pubsub_msg_serializer {
+	void* handle;
+	unsigned int msgId;
+	const char* msgName;
+	version_pt msgVersion;
+
+	celix_status_t (*serialize)(void* handle, const void* input, void** out, size_t* outLen);
+	celix_status_t (*deserialize)(void* handle, const void* input, size_t inputLen, void** out); //note inputLen can be 0 if predefined size is not needed
+	void (*freeMsg)(void* handle, void* msg);
+
+} pubsub_msg_serializer_t;
+
+typedef struct pubsub_serializer_service {
+	void* handle;
+
+	celix_status_t (*createSerializerMap)(void* handle, bundle_pt bundle, hash_map_pt* serializerMap);
+	celix_status_t (*destroySerializerMap)(void* handle, hash_map_pt serializerMap);
+
+} pubsub_serializer_service_t;
+
+#endif /* PUBSUB_SERIALIZER_SERVICE_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/include/pubsub_topic_info.descriptor
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_topic_info.descriptor b/pubsub/pubsub_spi/include/pubsub_topic_info.descriptor
new file mode 100644
index 0000000..c01a2fd
--- /dev/null
+++ b/pubsub/pubsub_spi/include/pubsub_topic_info.descriptor
@@ -0,0 +1,10 @@
+:header
+type=interface
+name=pubsub_topic_info
+version=1.0.0
+:annotations
+:types
+:methods
+getParticipantsNumber(t)i=getParticipantsNumber(#am=handle;Pt#am=pre;*i)N
+getSubscribersNumber(t)i=getSubscribersNumber(#am=handle;Pt#am=pre;*i)N
+getPublishersNumber(t)i=getPublishersNumber(#am=handle;Pt#am=pre;*i)N

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/include/pubsub_utils.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/include/pubsub_utils.h b/pubsub/pubsub_spi/include/pubsub_utils.h
new file mode 100644
index 0000000..aff5c72
--- /dev/null
+++ b/pubsub/pubsub_spi/include/pubsub_utils.h
@@ -0,0 +1,39 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_utils.h
+ *
+ *  \date       Sep 24, 2015
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_UTILS_H_
+#define PUBSUB_UTILS_H_
+
+#include "bundle_context.h"
+#include "array_list.h"
+
+char* pubsub_getScopeFromFilter(char* bundle_filter);
+char* pubsub_getTopicFromFilter(char* bundle_filter);
+char* pubsub_getKeysBundleDir(bundle_context_pt ctx);
+array_list_pt pubsub_getTopicsFromString(char* string);
+
+
+#endif /* PUBSUB_UTILS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/src/pubsub_admin_match.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/src/pubsub_admin_match.c b/pubsub/pubsub_spi/src/pubsub_admin_match.c
new file mode 100644
index 0000000..2a695c1
--- /dev/null
+++ b/pubsub/pubsub_spi/src/pubsub_admin_match.c
@@ -0,0 +1,320 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+
+#include <string.h>
+#include "service_reference.h"
+
+#include "pubsub_admin.h"
+
+#include "pubsub_admin_match.h"
+
+#define KNOWN_PUBSUB_ADMIN_NUM	2
+#define KNOWN_SERIALIZER_NUM	2
+
+static char* qos_sample_pubsub_admin_prio_list[KNOWN_PUBSUB_ADMIN_NUM] = {"udp_mc","zmq"};
+static char* qos_sample_serializer_prio_list[KNOWN_SERIALIZER_NUM] = {"json","void"};
+
+static char* qos_control_pubsub_admin_prio_list[KNOWN_PUBSUB_ADMIN_NUM] = {"zmq","udp_mc"};
+static char* qos_control_serializer_prio_list[KNOWN_SERIALIZER_NUM] = {"json","void"};
+
+static double qos_pubsub_admin_score[KNOWN_PUBSUB_ADMIN_NUM] = {100.0F,75.0F};
+static double qos_serializer_score[KNOWN_SERIALIZER_NUM] = {30.0F,20.0F};
+
+static void get_serializer_type(service_reference_pt svcRef, char **serializerType);
+static void manage_service_from_reference(service_reference_pt svcRef, void **svc, bool getService);
+
+celix_status_t pubsub_admin_match(properties_pt endpoint_props, const char *pubsub_admin_type, array_list_pt serializerList, double *score){
+
+	celix_status_t status = CELIX_SUCCESS;
+	double final_score = 0;
+	int i = 0, j = 0;
+
+	const char *requested_admin_type 		= NULL;
+	const char *requested_serializer_type 	= NULL;
+	const char *requested_qos_type			= NULL;
+
+	if(endpoint_props!=NULL){
+		requested_admin_type 		= properties_get(endpoint_props,PUBSUB_ADMIN_TYPE_KEY);
+		requested_serializer_type 	= properties_get(endpoint_props,PUBSUB_SERIALIZER_TYPE_KEY);
+		requested_qos_type			= properties_get(endpoint_props,QOS_ATTRIBUTE_KEY);
+	}
+
+	/* Analyze the pubsub_admin */
+	if(requested_admin_type != NULL){ /* We got precise specification on the pubsub_admin we want */
+		if(strncmp(requested_admin_type,pubsub_admin_type,strlen(pubsub_admin_type))==0){ //Full match
+			final_score += PUBSUB_ADMIN_FULL_MATCH_SCORE;
+		}
+	}
+	else if(requested_qos_type != NULL){ /* We got QoS specification that will determine the selected PSA */
+		if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){
+			for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){
+				if(strncmp(qos_sample_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){
+					final_score += qos_pubsub_admin_score[i];
+					break;
+				}
+			}
+		}
+		else if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){
+			for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){
+				if(strncmp(qos_control_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){
+					final_score += qos_pubsub_admin_score[i];
+					break;
+				}
+			}
+		}
+		else{
+			printf("Unknown QoS type '%s'\n",requested_qos_type);
+			status = CELIX_ILLEGAL_ARGUMENT;
+		}
+	}
+	else{ /* We got no specification: fallback to Qos=Sample, but count half the score */
+		for(i=0;i<KNOWN_PUBSUB_ADMIN_NUM;i++){
+			if(strncmp(qos_sample_pubsub_admin_prio_list[i],pubsub_admin_type,strlen(pubsub_admin_type))==0){
+				final_score += (qos_pubsub_admin_score[i]/2);
+				break;
+			}
+		}
+	}
+
+	char *serializer_type = NULL;
+	/* Analyze the serializers */
+	if(requested_serializer_type != NULL){ /* We got precise specification on the serializer we want */
+		for(i=0;i<arrayList_size(serializerList);i++){
+			service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,i);
+			get_serializer_type(svcRef, &serializer_type);
+			if(serializer_type != NULL){
+				if(strncmp(requested_serializer_type,serializer_type,strlen(serializer_type))==0){
+					final_score += SERIALIZER_FULL_MATCH_SCORE;
+					break;
+				}
+			}
+		}
+	}
+	else if(requested_qos_type != NULL){ /* We got QoS specification that will determine the selected serializer */
+		if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){
+			bool ser_found = false;
+			for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
+				for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){
+					service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,j);
+					get_serializer_type(svcRef, &serializer_type);
+					if(serializer_type != NULL){
+						if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
+							ser_found = true;
+						}
+					}
+				}
+				if(ser_found){
+					final_score += qos_serializer_score[i];
+				}
+			}
+		}
+		else if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){
+			bool ser_found = false;
+			for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
+				for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){
+					service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,j);
+					get_serializer_type(svcRef, &serializer_type);
+					if(serializer_type != NULL){
+						if(strncmp(qos_control_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
+							ser_found = true;
+						}
+					}
+				}
+				if(ser_found){
+					final_score += qos_serializer_score[i];
+				}
+			}
+		}
+		else{
+			printf("Unknown QoS type '%s'\n",requested_qos_type);
+			status = CELIX_ILLEGAL_ARGUMENT;
+		}
+	}
+	else{ /* We got no specification: fallback to Qos=Sample, but count half the score */
+		bool ser_found = false;
+		for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
+			for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){
+				service_reference_pt svcRef = (service_reference_pt)arrayList_get(serializerList,j);
+				get_serializer_type(svcRef, &serializer_type);
+				if(serializer_type != NULL){
+					if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
+						ser_found = true;
+					}
+				}
+			}
+			if(ser_found){
+				final_score += (qos_serializer_score[i]/2);
+			}
+		}
+	}
+
+	*score = final_score;
+
+	printf("Score for pair <%s,%s> = %f\n",pubsub_admin_type,serializer_type,final_score);
+
+	return status;
+}
+
+celix_status_t pubsub_admin_get_best_serializer(properties_pt endpoint_props, array_list_pt serializerList, pubsub_serializer_service_t **serSvc){
+	celix_status_t status = CELIX_SUCCESS;
+
+	int i = 0, j = 0;
+
+	const char *requested_serializer_type = NULL;
+	const char *requested_qos_type = NULL;
+
+	if (endpoint_props != NULL){
+		requested_serializer_type = properties_get(endpoint_props,PUBSUB_SERIALIZER_TYPE_KEY);
+		requested_qos_type = properties_get(endpoint_props,QOS_ATTRIBUTE_KEY);
+	}
+
+	service_reference_pt svcRef = NULL;
+	void *svc = NULL;
+
+	/* Analyze the serializers */
+	if(requested_serializer_type != NULL){ /* We got precise specification on the serializer we want */
+		for(i=0;i<arrayList_size(serializerList);i++){
+			svcRef = (service_reference_pt)arrayList_get(serializerList,i);
+			char *serializer_type = NULL;
+			get_serializer_type(svcRef, &serializer_type);
+			if(serializer_type != NULL){
+				if(strncmp(requested_serializer_type,serializer_type,strlen(serializer_type))==0){
+					manage_service_from_reference(svcRef, &svc,true);
+					if(svc==NULL){
+						printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef);
+						status = CELIX_SERVICE_EXCEPTION;
+					}
+					*serSvc = svc;
+					break;
+				}
+			}
+		}
+	}
+	else if(requested_qos_type != NULL){ /* We got QoS specification that will determine the selected serializer */
+		if(strncmp(requested_qos_type,QOS_TYPE_SAMPLE,strlen(QOS_TYPE_SAMPLE))==0){
+			bool ser_found = false;
+			for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
+				for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){
+					svcRef = (service_reference_pt)arrayList_get(serializerList,j);
+					char *serializer_type = NULL;
+					get_serializer_type(svcRef, &serializer_type);
+					if(serializer_type != NULL){
+						if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
+							manage_service_from_reference(svcRef, &svc,true);
+							if(svc==NULL){
+								printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef);
+								status = CELIX_SERVICE_EXCEPTION;
+							}
+							else{
+								*serSvc = svc;
+								ser_found = true;
+								printf("Selected %s serializer as best for QoS=%s\n",qos_sample_serializer_prio_list[i],QOS_TYPE_SAMPLE);
+							}
+						}
+					}
+				}
+			}
+		}
+		else if(strncmp(requested_qos_type,QOS_TYPE_CONTROL,strlen(QOS_TYPE_CONTROL))==0){
+			bool ser_found = false;
+			for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
+				for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){
+					svcRef = (service_reference_pt)arrayList_get(serializerList,j);
+					char *serializer_type = NULL;
+					get_serializer_type(svcRef, &serializer_type);
+					if(serializer_type != NULL){
+						if(strncmp(qos_control_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
+							manage_service_from_reference(svcRef, &svc,true);
+							if(svc==NULL){
+								printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef);
+								status = CELIX_SERVICE_EXCEPTION;
+							}
+							else{
+								*serSvc = svc;
+								ser_found = true;
+								printf("Selected %s serializer as best for QoS=%s\n",qos_control_serializer_prio_list[i],QOS_TYPE_CONTROL);
+							}
+						}
+					}
+				}
+			}
+		}
+		else{
+			printf("Unknown QoS type '%s'\n",requested_qos_type);
+			status = CELIX_ILLEGAL_ARGUMENT;
+		}
+	}
+	else{ /* We got no specification: fallback to Qos=Sample, but count half the score */
+		bool ser_found = false;
+		for(i=0;i<KNOWN_SERIALIZER_NUM && !ser_found;i++){
+			for(j=0;j<arrayList_size(serializerList) && !ser_found;j++){
+				svcRef = (service_reference_pt)arrayList_get(serializerList,j);
+				char *serializer_type = NULL;
+				get_serializer_type(svcRef, &serializer_type);
+				if(serializer_type != NULL){
+					if(strncmp(qos_sample_serializer_prio_list[i],serializer_type,strlen(serializer_type))==0){
+						manage_service_from_reference(svcRef, &svc,true);
+						if(svc==NULL){
+							printf("Cannot get pubsub_serializer_service from serviceReference %p\n",svcRef);
+							status = CELIX_SERVICE_EXCEPTION;
+						}
+						else{
+							*serSvc = svc;
+							ser_found = true;
+							printf("Selected %s serializer as best without any specification\n",qos_sample_serializer_prio_list[i]);
+						}
+					}
+				}
+			}
+		}
+	}
+
+	if(svc!=NULL && svcRef!=NULL){
+		manage_service_from_reference(svcRef, svc, false);
+	}
+
+	return status;
+}
+
+static void get_serializer_type(service_reference_pt svcRef, char **serializerType){
+
+	const char *serType = NULL;
+	serviceReference_getProperty(svcRef, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
+	if(serType != NULL){
+		*serializerType = (char*)serType;
+	}
+	else{
+		printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",svcRef);
+		*serializerType = NULL;
+	}
+}
+
+static void manage_service_from_reference(service_reference_pt svcRef, void **svc, bool getService){
+	bundle_context_pt context = NULL;
+	bundle_pt bundle = NULL;
+	serviceReference_getBundle(svcRef, &bundle);
+	bundle_getContext(bundle, &context);
+	if(getService){
+		bundleContext_getService(context, svcRef, svc);
+	}
+	else{
+		bundleContext_ungetService(context, svcRef, NULL);
+	}
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/src/pubsub_endpoint.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/src/pubsub_endpoint.c b/pubsub/pubsub_spi/src/pubsub_endpoint.c
new file mode 100644
index 0000000..c3fd293
--- /dev/null
+++ b/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@ -0,0 +1,254 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * endpoint_description.c
+ *
+ *  \date       25 Jul 2014
+ *  \author     <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright  Apache License, Version 2.0
+ */
+
+#include <string.h>
+#include <stdlib.h>
+
+#include "celix_errno.h"
+#include "celix_log.h"
+
+#include "pubsub_common.h"
+#include "pubsub_endpoint.h"
+#include "constants.h"
+
+#include "pubsub_utils.h"
+
+
+static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props, bool cloneProps);
+static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const char *topic, bool isPublisher);
+
+static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props, bool cloneProps){
+
+	if (fwUUID != NULL) {
+		psEp->frameworkUUID = strdup(fwUUID);
+	}
+
+	if (scope != NULL) {
+		psEp->scope = strdup(scope);
+	}
+
+	if (topic != NULL) {
+		psEp->topic = strdup(topic);
+	}
+
+	psEp->serviceID = serviceId;
+
+	if(endpoint != NULL) {
+		psEp->endpoint = strdup(endpoint);
+	}
+
+	if(topic_props != NULL){
+		if(cloneProps){
+			properties_copy(topic_props, &(psEp->topic_props));
+		}
+		else{
+			psEp->topic_props = topic_props;
+		}
+	}
+}
+
+static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const char *topic, bool isPublisher){
+
+	properties_pt topic_props = NULL;
+
+	bool isSystemBundle = false;
+	bundle_isSystemBundle(bundle, &isSystemBundle);
+	long bundleId = -1;
+	bundle_isSystemBundle(bundle, &isSystemBundle);
+	bundle_getBundleId(bundle,&bundleId);
+
+	if(isSystemBundle == false) {
+
+		char *bundleRoot = NULL;
+		char* topicPropertiesPath = NULL;
+		bundle_getEntry(bundle, ".", &bundleRoot);
+
+		if(bundleRoot != NULL){
+
+			asprintf(&topicPropertiesPath, "%s/META-INF/topics/%s/%s.properties", bundleRoot, isPublisher?"pub":"sub", topic);
+			topic_props = properties_load(topicPropertiesPath);
+			if(topic_props==NULL){
+				printf("PSEP: Could not load properties for %s on topic %s, bundleId=%ld\n", isPublisher?"publication":"subscription", topic,bundleId);
+			}
+
+			free(topicPropertiesPath);
+			free(bundleRoot);
+		}
+	}
+
+	return topic_props;
+}
+
+celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props,pubsub_endpoint_pt* psEp){
+	celix_status_t status = CELIX_SUCCESS;
+
+	*psEp = calloc(1, sizeof(**psEp));
+
+	pubsubEndpoint_setFields(*psEp, fwUUID, scope, topic, serviceId, endpoint, topic_props, true);
+
+	return status;
+
+}
+
+celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *out){
+	celix_status_t status = CELIX_SUCCESS;
+
+	*out = calloc(1,sizeof(**out));
+
+	pubsubEndpoint_setFields(*out, in->frameworkUUID, in->scope, in->topic, in->serviceID, in->endpoint, in->topic_props, true);
+
+	return status;
+
+}
+
+celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference, pubsub_endpoint_pt* psEp, bool isPublisher){
+	celix_status_t status = CELIX_SUCCESS;
+
+	pubsub_endpoint_pt ep = calloc(1,sizeof(*ep));
+
+	bundle_pt bundle = NULL;
+	bundle_context_pt ctxt = NULL;
+	const char* fwUUID = NULL;
+	serviceReference_getBundle(reference,&bundle);
+	bundle_getContext(bundle,&ctxt);
+	bundleContext_getProperty(ctxt,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+
+	const char* scope = NULL;
+	serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_SCOPE,&scope);
+
+	const char* topic = NULL;
+	serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_TOPIC,&topic);
+
+	const char* serviceId = NULL;
+	serviceReference_getProperty(reference,(char*)OSGI_FRAMEWORK_SERVICE_ID,&serviceId);
+
+	/* TODO: is topic_props==NULL a fatal error such that EP cannot be created? */
+	properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, topic, isPublisher);
+
+	pubsubEndpoint_setFields(ep, fwUUID, scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, strtol(serviceId,NULL,10), NULL, topic_props, false);
+
+	if (!ep->frameworkUUID || !ep->serviceID || !ep->scope || !ep->topic) {
+		fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "PUBSUB_ENDPOINT: incomplete description!.");
+		status = CELIX_BUNDLE_EXCEPTION;
+		pubsubEndpoint_destroy(ep);
+		*psEp = NULL;
+	}
+	else{
+		*psEp = ep;
+	}
+
+	return status;
+
+}
+
+celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt info,pubsub_endpoint_pt* psEp, bool isPublisher){
+	celix_status_t status = CELIX_SUCCESS;
+
+	const char* fwUUID=NULL;
+	bundleContext_getProperty(info->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+
+	if(fwUUID==NULL){
+		return CELIX_BUNDLE_EXCEPTION;
+	}
+
+	char* topic = pubsub_getTopicFromFilter(info->filter);
+	if(topic==NULL){
+		return CELIX_BUNDLE_EXCEPTION;
+	}
+
+	*psEp = calloc(1, sizeof(**psEp));
+
+	char* scope = pubsub_getScopeFromFilter(info->filter);
+	if(scope == NULL) {
+		scope = strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT);
+	}
+
+	bundle_pt bundle = NULL;
+	long bundleId = -1;
+	bundleContext_getBundle(info->context,&bundle);
+
+	bundle_getBundleId(bundle,&bundleId);
+
+	properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, topic, isPublisher);
+
+	/* TODO: is topic_props==NULL a fatal error such that EP cannot be created? */
+	pubsubEndpoint_setFields(*psEp, fwUUID, scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, bundleId, NULL, topic_props, false);
+
+	free(topic);
+	free(scope);
+
+
+	return status;
+}
+
+celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){
+
+	if(psEp->frameworkUUID!=NULL){
+		free(psEp->frameworkUUID);
+		psEp->frameworkUUID = NULL;
+	}
+
+	if(psEp->scope!=NULL){
+		free(psEp->scope);
+		psEp->scope = NULL;
+	}
+
+	if(psEp->topic!=NULL){
+		free(psEp->topic);
+		psEp->topic = NULL;
+	}
+
+	if(psEp->endpoint!=NULL){
+		free(psEp->endpoint);
+		psEp->endpoint = NULL;
+	}
+
+	if(psEp->topic_props != NULL){
+		properties_destroy(psEp->topic_props);
+	}
+
+	free(psEp);
+
+	return CELIX_SUCCESS;
+
+}
+
+bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){
+
+	return ((strcmp(psEp1->frameworkUUID,psEp2->frameworkUUID)==0) &&
+			(strcmp(psEp1->scope,psEp2->scope)==0) &&
+			(strcmp(psEp1->topic,psEp2->topic)==0) &&
+			(psEp1->serviceID == psEp2->serviceID) /*&&
+			((psEp1->endpoint==NULL && psEp2->endpoint==NULL)||(strcmp(psEp1->endpoint,psEp2->endpoint)==0))*/
+	);
+}
+
+char *createScopeTopicKey(const char* scope, const char* topic) {
+	char *result = NULL;
+	asprintf(&result, "%s:%s", scope, topic);
+
+	return result;
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_spi/src/pubsub_utils.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_spi/src/pubsub_utils.c b/pubsub/pubsub_spi/src/pubsub_utils.c
new file mode 100644
index 0000000..19b2271
--- /dev/null
+++ b/pubsub/pubsub_spi/src/pubsub_utils.c
@@ -0,0 +1,170 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_utils.c
+ *
+ *  \date       Sep 24, 2015
+ *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#include <string.h>
+#include <stdlib.h>
+
+#include "constants.h"
+
+#include "pubsub_common.h"
+#include "pubsub/publisher.h"
+#include "pubsub_utils.h"
+
+#include "array_list.h"
+#include "bundle.h"
+
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#define MAX_KEYBUNDLE_LENGTH 256
+
+char* pubsub_getScopeFromFilter(char* bundle_filter){
+
+	char* scope = NULL;
+
+	char* filter = strdup(bundle_filter);
+
+	char* oc = strstr(filter,OSGI_FRAMEWORK_OBJECTCLASS);
+	if(oc!=NULL){
+		oc+=strlen(OSGI_FRAMEWORK_OBJECTCLASS)+1;
+		if(strncmp(oc,PUBSUB_PUBLISHER_SERVICE_NAME,strlen(PUBSUB_PUBLISHER_SERVICE_NAME))==0){
+
+			char* scopes = strstr(filter,PUBSUB_PUBLISHER_SCOPE);
+			if(scopes!=NULL){
+
+				scopes+=strlen(PUBSUB_PUBLISHER_SCOPE)+1;
+				char* bottom=strchr(scopes,')');
+				*bottom='\0';
+
+				scope=strdup(scopes);
+			} else {
+			    scope=strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT);
+			}
+		}
+	}
+
+	free(filter);
+
+	return scope;
+}
+
+char* pubsub_getTopicFromFilter(char* bundle_filter){
+
+	char* topic = NULL;
+
+	char* filter = strdup(bundle_filter);
+
+	char* oc = strstr(filter,OSGI_FRAMEWORK_OBJECTCLASS);
+	if(oc!=NULL){
+		oc+=strlen(OSGI_FRAMEWORK_OBJECTCLASS)+1;
+		if(strncmp(oc,PUBSUB_PUBLISHER_SERVICE_NAME,strlen(PUBSUB_PUBLISHER_SERVICE_NAME))==0){
+
+			char* topics = strstr(filter,PUBSUB_PUBLISHER_TOPIC);
+			if(topics!=NULL){
+
+				topics+=strlen(PUBSUB_PUBLISHER_TOPIC)+1;
+				char* bottom=strchr(topics,')');
+				*bottom='\0';
+
+				topic=strdup(topics);
+
+			}
+		}
+	}
+
+	free(filter);
+
+	return topic;
+
+}
+
+array_list_pt pubsub_getTopicsFromString(char* string){
+
+	array_list_pt topic_list = NULL;
+	arrayList_create(&topic_list);
+
+	char* topics = strdup(string);
+
+	char* topic = strtok(topics,",;|# ");
+	arrayList_add(topic_list,strdup(topic));
+
+	while( (topic = strtok(NULL,",;|# ")) !=NULL){
+		arrayList_add(topic_list,strdup(topic));
+	}
+
+	free(topics);
+
+	return topic_list;
+
+}
+
+/**
+ * Loop through all bundles and look for the bundle with the keys inside.
+ * If no key bundle found, return NULL
+ *
+ * Caller is responsible for freeing the object
+ */
+char* pubsub_getKeysBundleDir(bundle_context_pt ctx)
+{
+	array_list_pt bundles = NULL;
+	bundleContext_getBundles(ctx, &bundles);
+	int nrOfBundles = arrayList_size(bundles);
+	long bundle_id = -1;
+	char* result = NULL;
+
+	for (int i = 0; i < nrOfBundles; i++){
+		bundle_pt b = arrayList_get(bundles, i);
+
+		/* Skip bundle 0 (framework bundle) since it has no path nor revisions */
+		bundle_getBundleId(b, &bundle_id);
+		if(bundle_id==0){
+			continue;
+		}
+
+		char* dir = NULL;
+		bundle_getEntry(b, ".", &dir);
+
+		char cert_dir[MAX_KEYBUNDLE_LENGTH];
+		snprintf(cert_dir, MAX_KEYBUNDLE_LENGTH, "%s/META-INF/keys", dir);
+
+		struct stat s;
+		int err = stat(cert_dir, &s);
+		if (err != -1){
+			if (S_ISDIR(s.st_mode)){
+				result = dir;
+				break;
+			}
+		}
+
+		free(dir);
+	}
+
+	arrayList_destroy(bundles);
+
+	return result;
+}
+

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_topology_manager/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/CMakeLists.txt b/pubsub/pubsub_topology_manager/CMakeLists.txt
index 784ca21..73b9ecb 100644
--- a/pubsub/pubsub_topology_manager/CMakeLists.txt
+++ b/pubsub/pubsub_topology_manager/CMakeLists.txt
@@ -15,22 +15,23 @@
 # specific language governing permissions and limitations
 # under the License.
 
-add_bundle(org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
+add_bundle(celix_pubsub_topology_manager
     BUNDLE_SYMBOLICNAME "apache_celix_pubsub_topology_manager"
     VERSION "1.0.0"
     SOURCES
-    	private/src/pstm_activator.c
-    	private/src/pubsub_topology_manager.c
-    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
-    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c	
+    	src/pstm_activator.c
+    	src/pubsub_topology_manager.c
+    	src/pubsub_topology_manager.h
 )
+target_link_libraries(celix_pubsub_topology_manager PRIVATE Celix::framework Celix::log_helper Celix::pubsub_spi)
 
-bundle_files(org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
-   ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include/pubsub_topic_info.descriptor
+get_target_property(DESC Celix::pubsub_spi TOPIC_INFO_DESCRIPTOR)
+bundle_files(celix_pubsub_topology_manager
+	${DESC}
     DESTINATION "META-INF/descriptors/services"
 )
-target_link_libraries(org.apache.celix.pubsub_topology_manager.PubSubTopologyManager PRIVATE Celix::framework Celix::log_helper)
 
+install_bundle(celix_pubsub_topology_manager)
 
-install_bundle(org.apache.celix.pubsub_topology_manager.PubSubTopologyManager)
+add_library(Celix::pubsub_topology_manager ALIAS celix_pubsub_topology_manager)
 

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h b/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h
deleted file mode 100644
index 7614e0c..0000000
--- a/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_topology_manager.h
- *
- *  \date       Sep 29, 2011
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#ifndef PUBSUB_TOPOLOGY_MANAGER_H_
-#define PUBSUB_TOPOLOGY_MANAGER_H_
-
-#include "endpoint_listener.h"
-#include "service_reference.h"
-#include "bundle_context.h"
-#include "log_helper.h"
-
-#include "pubsub_common.h"
-#include "pubsub_endpoint.h"
-#include "publisher.h"
-#include "subscriber.h"
-
-
-struct pubsub_topology_manager {
-	bundle_context_pt context;
-
-	celix_thread_mutex_t psaListLock;
-	array_list_pt psaList;
-
-	celix_thread_mutex_t discoveryListLock;
-	hash_map_pt discoveryList; //<serviceReference,NULL>
-
-	celix_thread_mutex_t publicationsLock;
-	hash_map_pt publications; //<topic(string),list<pubsub_ep>>
-
-	celix_thread_mutex_t subscriptionsLock;
-	hash_map_pt subscriptions; //<topic(string),list<pubsub_ep>>
-
-	log_helper_pt loghelper;
-};
-
-typedef struct pubsub_topology_manager *pubsub_topology_manager_pt;
-
-celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_pt *manager);
-celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager);
-celix_status_t pubsub_topologyManager_closeImports(pubsub_topology_manager_pt manager);
-
-celix_status_t pubsub_topologyManager_psaAdded(void *handle, service_reference_pt reference, void *service);
-celix_status_t pubsub_topologyManager_psaModified(void *handle, service_reference_pt reference, void *service);
-celix_status_t pubsub_topologyManager_psaRemoved(void *handle, service_reference_pt reference, void *service);
-
-celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service_reference_pt reference, void* service);
-celix_status_t pubsub_topologyManager_pubsubDiscoveryModified(void * handle, service_reference_pt reference, void* service);
-celix_status_t pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, service_reference_pt reference, void* service);
-
-celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_reference_pt reference, void * service);
-celix_status_t pubsub_topologyManager_subscriberModified(void * handle, service_reference_pt reference, void * service);
-celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_reference_pt reference, void * service);
-
-celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_list_pt listeners);
-celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, array_list_pt listeners);
-
-celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_endpoint_pt pubEP);
-celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpoint_pt pubEP);
-
-#endif /* PUBSUB_TOPOLOGY_MANAGER_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/private/src/pstm_activator.c b/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
deleted file mode 100644
index 4d6dd27..0000000
--- a/pubsub/pubsub_topology_manager/private/src/pstm_activator.c
+++ /dev/null
@@ -1,246 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pstm_activator.c
- *
- *  \date       Sep 29, 2011
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include "constants.h"
-#include "bundle_activator.h"
-#include "service_tracker.h"
-#include "service_registration.h"
-
-#include "endpoint_listener.h"
-#include "remote_constants.h"
-#include "listener_hook_service.h"
-#include "log_service.h"
-#include "log_helper.h"
-
-
-#include "pubsub_topology_manager.h"
-#include "publisher_endpoint_announce.h"
-
-struct activator {
-	bundle_context_pt context;
-
-	pubsub_topology_manager_pt manager;
-
-	service_tracker_pt pubsubDiscoveryTracker;
-	service_tracker_pt pubsubAdminTracker;
-	service_tracker_pt pubsubSubscribersTracker;
-
-	listener_hook_service_pt hookService;
-	service_registration_pt hook;
-
-	publisher_endpoint_announce_pt publisherEPDiscover;
-	service_registration_pt publisherEPDiscoverService;
-
-	log_helper_pt loghelper;
-};
-
-
-static celix_status_t bundleActivator_createPSDTracker(struct activator *activator, service_tracker_pt *tracker);
-static celix_status_t bundleActivator_createPSATracker(struct activator *activator, service_tracker_pt *tracker);
-static celix_status_t bundleActivator_createPSSubTracker(struct activator *activator, service_tracker_pt *tracker);
-
-
-static celix_status_t bundleActivator_createPSDTracker(struct activator *activator, service_tracker_pt *tracker) {
-	celix_status_t status;
-
-	service_tracker_customizer_pt customizer = NULL;
-
-	status = serviceTrackerCustomizer_create(activator->manager,
-			NULL,
-			pubsub_topologyManager_pubsubDiscoveryAdded,
-			pubsub_topologyManager_pubsubDiscoveryModified,
-			pubsub_topologyManager_pubsubDiscoveryRemoved,
-			&customizer);
-
-	if (status == CELIX_SUCCESS) {
-		status = serviceTracker_create(activator->context, (char *) PUBSUB_DISCOVERY_SERVICE, customizer, tracker);
-	}
-
-	return status;
-}
-
-static celix_status_t bundleActivator_createPSATracker(struct activator *activator, service_tracker_pt *tracker) {
-	celix_status_t status = CELIX_SUCCESS;
-
-	service_tracker_customizer_pt customizer = NULL;
-
-	status = serviceTrackerCustomizer_create(activator->manager,
-			NULL,
-			pubsub_topologyManager_psaAdded,
-			pubsub_topologyManager_psaModified,
-			pubsub_topologyManager_psaRemoved,
-			&customizer);
-
-	if (status == CELIX_SUCCESS) {
-		status = serviceTracker_create(activator->context, PUBSUB_ADMIN_SERVICE, customizer, tracker);
-	}
-
-	return status;
-}
-
-static celix_status_t bundleActivator_createPSSubTracker(struct activator *activator, service_tracker_pt *tracker) {
-	celix_status_t status = CELIX_SUCCESS;
-
-	service_tracker_customizer_pt customizer = NULL;
-
-	status = serviceTrackerCustomizer_create(activator->manager,
-			NULL,
-			pubsub_topologyManager_subscriberAdded,
-			pubsub_topologyManager_subscriberModified,
-			pubsub_topologyManager_subscriberRemoved,
-			&customizer);
-
-	if (status == CELIX_SUCCESS) {
-		status = serviceTracker_create(activator->context, PUBSUB_SUBSCRIBER_SERVICE_NAME, customizer, tracker);
-	}
-
-	return status;
-}
-
-celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) {
-	celix_status_t status = CELIX_SUCCESS;
-	struct activator *activator = NULL;
-
-	activator = calloc(1,sizeof(struct activator));
-
-	if (!activator) {
-		return CELIX_ENOMEM;
-	}
-
-	activator->context = context;
-
-	logHelper_create(context, &activator->loghelper);
-	logHelper_start(activator->loghelper);
-
-	status = pubsub_topologyManager_create(context, activator->loghelper, &activator->manager);
-	if (status == CELIX_SUCCESS) {
-		status = bundleActivator_createPSDTracker(activator, &activator->pubsubDiscoveryTracker);
-		if (status == CELIX_SUCCESS) {
-			status = bundleActivator_createPSATracker(activator, &activator->pubsubAdminTracker);
-			if (status == CELIX_SUCCESS) {
-				status = bundleActivator_createPSSubTracker(activator, &activator->pubsubSubscribersTracker);
-				if (status == CELIX_SUCCESS) {
-					*userData = activator;
-				}
-			}
-		}
-	}
-
-	if(status != CELIX_SUCCESS){
-		bundleActivator_destroy(activator, context);
-	}
-
-	return status;
-}
-
-
-celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) {
-	celix_status_t status = CELIX_SUCCESS;
-	struct activator *activator = userData;
-
-	publisher_endpoint_announce_pt pubEPDiscover = calloc(1, sizeof(*pubEPDiscover));
-	pubEPDiscover->handle = activator->manager;
-	pubEPDiscover->announcePublisher = pubsub_topologyManager_announcePublisher;
-	pubEPDiscover->removePublisher = pubsub_topologyManager_removePublisher;
-	activator->publisherEPDiscover = pubEPDiscover;
-
-	status += bundleContext_registerService(context, (char *) PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE, pubEPDiscover, NULL, &activator->publisherEPDiscoverService);
-
-
-	listener_hook_service_pt hookService = calloc(1,sizeof(*hookService));
-	hookService->handle = activator->manager;
-	hookService->added = pubsub_topologyManager_publisherTrackerAdded;
-	hookService->removed = pubsub_topologyManager_publisherTrackerRemoved;
-	activator->hookService = hookService;
-
-	status += bundleContext_registerService(context, (char *) OSGI_FRAMEWORK_LISTENER_HOOK_SERVICE_NAME, hookService, NULL, &activator->hook);
-
-	/* NOTE: Enable those line in order to remotely expose the topic_info service
-	properties_pt props = properties_create();
-	properties_set(props, (char *) OSGI_RSA_SERVICE_EXPORTED_INTERFACES, (char *) PUBSUB_TOPIC_INFO_SERVICE);
-	status += bundleContext_registerService(context, (char *) PUBSUB_TOPIC_INFO_SERVICE, activator->topicInfo, props, &activator->topicInfoService);
-	*/
-	status += serviceTracker_open(activator->pubsubAdminTracker);
-
-	status += serviceTracker_open(activator->pubsubDiscoveryTracker);
-
-	status += serviceTracker_open(activator->pubsubSubscribersTracker);
-
-
-	return status;
-}
-
-celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) {
-	celix_status_t status = CELIX_SUCCESS;
-	struct activator *activator = userData;
-
-	serviceTracker_close(activator->pubsubSubscribersTracker);
-	serviceTracker_close(activator->pubsubDiscoveryTracker);
-	serviceTracker_close(activator->pubsubAdminTracker);
-
-	serviceRegistration_unregister(activator->publisherEPDiscoverService);
-	free(activator->publisherEPDiscover);
-
-	serviceRegistration_unregister(activator->hook);
-	free(activator->hookService);
-
-	return status;
-}
-
-celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) {
-	celix_status_t status = CELIX_SUCCESS;
-
-	struct activator *activator = userData;
-	if (activator == NULL) {
-		status = CELIX_BUNDLE_EXCEPTION;
-	} else {
-
-		if(activator->pubsubSubscribersTracker!=NULL){
-			serviceTracker_destroy(activator->pubsubSubscribersTracker);
-		}
-		if(activator->pubsubDiscoveryTracker!=NULL){
-			serviceTracker_destroy(activator->pubsubDiscoveryTracker);
-		}
-		if(activator->pubsubAdminTracker!=NULL){
-			serviceTracker_destroy(activator->pubsubAdminTracker);
-		}
-
-		if(activator->manager!=NULL){
-			status = pubsub_topologyManager_destroy(activator->manager);
-		}
-
-		logHelper_stop(activator->loghelper);
-		logHelper_destroy(&activator->loghelper);
-
-		free(activator);
-	}
-
-	return status;
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
deleted file mode 100644
index 987d864..0000000
--- a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
+++ /dev/null
@@ -1,723 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_topology_manager.c
- *
- *  \date       Sep 29, 2011
- *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright	Apache License, Version 2.0
- */
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <stdbool.h>
-
-#include "hash_map.h"
-#include "array_list.h"
-#include "bundle_context.h"
-#include "constants.h"
-#include "module.h"
-#include "bundle.h"
-#include "remote_service_admin.h"
-#include "remote_constants.h"
-#include "filter.h"
-#include "listener_hook_service.h"
-#include "utils.h"
-#include "service_reference.h"
-#include "service_registration.h"
-#include "log_service.h"
-#include "log_helper.h"
-
-#include "publisher_endpoint_announce.h"
-#include "pubsub_topology_manager.h"
-#include "pubsub_endpoint.h"
-#include "pubsub_admin.h"
-#include "pubsub_utils.h"
-
-
-celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_pt *manager) {
-	celix_status_t status = CELIX_SUCCESS;
-
-	*manager = calloc(1, sizeof(**manager));
-	if (!*manager) {
-		return CELIX_ENOMEM;
-	}
-
-	(*manager)->context = context;
-
-	celix_thread_mutexattr_t psaAttr;
-	celixThreadMutexAttr_create(&psaAttr);
-	celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE);
-	status = celixThreadMutex_create(&(*manager)->psaListLock, &psaAttr);
-	celixThreadMutexAttr_destroy(&psaAttr);
-
-	status = celixThreadMutex_create(&(*manager)->publicationsLock, NULL);
-	status = celixThreadMutex_create(&(*manager)->subscriptionsLock, NULL);
-	status = celixThreadMutex_create(&(*manager)->discoveryListLock, NULL);
-
-	arrayList_create(&(*manager)->psaList);
-
-	(*manager)->discoveryList = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL);
-	(*manager)->publications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-	(*manager)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-
-	(*manager)->loghelper = logHelper;
-
-	return status;
-}
-
-celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager) {
-	celix_status_t status = CELIX_SUCCESS;
-
-	celixThreadMutex_lock(&manager->discoveryListLock);
-	hashMap_destroy(manager->discoveryList, false, false);
-	celixThreadMutex_unlock(&manager->discoveryListLock);
-	celixThreadMutex_destroy(&manager->discoveryListLock);
-
-	celixThreadMutex_lock(&manager->psaListLock);
-	arrayList_destroy(manager->psaList);
-	celixThreadMutex_unlock(&manager->psaListLock);
-	celixThreadMutex_destroy(&manager->psaListLock);
-
-	celixThreadMutex_lock(&manager->publicationsLock);
-	hash_map_iterator_pt pubit = hashMapIterator_create(manager->publications);
-	while(hashMapIterator_hasNext(pubit)){
-		array_list_pt l = (array_list_pt)hashMapIterator_nextValue(pubit);
-		int i;
-		for(i=0;i<arrayList_size(l);i++){
-			pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i));
-		}
-		arrayList_destroy(l);
-	}
-	hashMapIterator_destroy(pubit);
-	hashMap_destroy(manager->publications, true, false);
-	celixThreadMutex_unlock(&manager->publicationsLock);
-	celixThreadMutex_destroy(&manager->publicationsLock);
-
-	celixThreadMutex_lock(&manager->subscriptionsLock);
-	hash_map_iterator_pt subit = hashMapIterator_create(manager->subscriptions);
-	while(hashMapIterator_hasNext(subit)){
-		array_list_pt l = (array_list_pt)hashMapIterator_nextValue(subit);
-		int i;
-		for(i=0;i<arrayList_size(l);i++){
-			pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i));
-		}
-		arrayList_destroy(l);
-	}
-	hashMapIterator_destroy(subit);
-	hashMap_destroy(manager->subscriptions, true, false);
-	celixThreadMutex_unlock(&manager->subscriptionsLock);
-	celixThreadMutex_destroy(&manager->subscriptionsLock);
-
-	free(manager);
-
-	return status;
-}
-
-celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_pt reference, void * service) {
-	celix_status_t status = CELIX_SUCCESS;
-	pubsub_topology_manager_pt manager = handle;
-	int i;
-
-	pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service;
-	logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added PSA");
-
-	celixThreadMutex_lock(&manager->psaListLock);
-	arrayList_add(manager->psaList, psa);
-	celixThreadMutex_unlock(&manager->psaListLock);
-
-	// Add already detected subscriptions to new PSA
-	celixThreadMutex_lock(&manager->subscriptionsLock);
-	hash_map_iterator_pt subscriptionsIterator = hashMapIterator_create(manager->subscriptions);
-
-	while (hashMapIterator_hasNext(subscriptionsIterator)) {
-		array_list_pt sub_ep_list = hashMapIterator_nextValue(subscriptionsIterator);
-		for(i=0;i<arrayList_size(sub_ep_list);i++){
-			status += psa->addSubscription(psa->admin, (pubsub_endpoint_pt)arrayList_get(sub_ep_list,i));
-		}
-	}
-
-	hashMapIterator_destroy(subscriptionsIterator);
-
-	celixThreadMutex_unlock(&manager->subscriptionsLock);
-
-	// Add already detected publications to new PSA
-	status = celixThreadMutex_lock(&manager->publicationsLock);
-	hash_map_iterator_pt publicationsIterator = hashMapIterator_create(manager->publications);
-
-	while (hashMapIterator_hasNext(publicationsIterator)) {
-		array_list_pt pub_ep_list = hashMapIterator_nextValue(publicationsIterator);
-		for(i=0;i<arrayList_size(pub_ep_list);i++){
-			status += psa->addPublication(psa->admin, (pubsub_endpoint_pt)arrayList_get(pub_ep_list,i));
-		}
-	}
-
-	hashMapIterator_destroy(publicationsIterator);
-
-	celixThreadMutex_unlock(&manager->publicationsLock);
-
-	return status;
-}
-
-celix_status_t pubsub_topologyManager_psaModified(void * handle, service_reference_pt reference, void * service) {
-	celix_status_t status = CELIX_SUCCESS;
-
-	// Nop...
-
-	return status;
-}
-
-celix_status_t pubsub_topologyManager_psaRemoved(void * handle, service_reference_pt reference, void * service) {
-	celix_status_t status = CELIX_SUCCESS;
-	pubsub_topology_manager_pt manager = handle;
-
-	pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service;
-
-	/* Deactivate all publications */
-	celixThreadMutex_lock(&manager->publicationsLock);
-
-	hash_map_iterator_pt pubit = hashMapIterator_create(manager->publications);
-	while(hashMapIterator_hasNext(pubit)){
-		hash_map_entry_pt pub_entry = hashMapIterator_nextEntry(pubit);
-		char* scope_topic_key = (char*)hashMapEntry_getKey(pub_entry);
-		// Extract scope/topic name from key
-		char scope[MAX_SCOPE_LEN];
-		char topic[MAX_TOPIC_LEN];
-		sscanf(scope_topic_key, "%[^:]:%s", scope, topic );
-		array_list_pt pubEP_list = (array_list_pt)hashMapEntry_getValue(pub_entry);
-
-		status = psa->closeAllPublications(psa->admin,scope,topic);
-
-		if(status==CELIX_SUCCESS){
-			celixThreadMutex_lock(&manager->discoveryListLock);
-			hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
-			while(hashMapIterator_hasNext(iter)){
-				service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
-				publisher_endpoint_announce_pt disc = NULL;
-				bundleContext_getService(manager->context, disc_sr, (void**) &disc);
-				const char* fwUUID = NULL;
-				bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
-				int i;
-				for(i=0;i<arrayList_size(pubEP_list);i++){
-					pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
-					if(strcmp(pubEP->frameworkUUID,fwUUID)==0){
-						disc->removePublisher(disc->handle,pubEP);
-					}
-				}
-				bundleContext_ungetService(manager->context, disc_sr, NULL);
-			}
-			hashMapIterator_destroy(iter);
-			celixThreadMutex_unlock(&manager->discoveryListLock);
-		}
-	}
-	hashMapIterator_destroy(pubit);
-
-	celixThreadMutex_unlock(&manager->publicationsLock);
-
-	/* Deactivate all subscriptions */
-	celixThreadMutex_lock(&manager->subscriptionsLock);
-	hash_map_iterator_pt subit = hashMapIterator_create(manager->subscriptions);
-	while(hashMapIterator_hasNext(subit)){
-		// TODO do some error checking
-		char* scope_topic = (char*)hashMapIterator_nextKey(subit);
-		char scope[MAX_TOPIC_LEN];
-		char topic[MAX_TOPIC_LEN];
-		memset(scope, 0 , MAX_TOPIC_LEN*sizeof(char));
-		memset(topic, 0 , MAX_TOPIC_LEN*sizeof(char));
-		sscanf(scope_topic, "%[^:]:%s", scope, topic );
-		status += psa->closeAllSubscriptions(psa->admin,scope, topic);
-	}
-	hashMapIterator_destroy(subit);
-	celixThreadMutex_unlock(&manager->subscriptionsLock);
-
-	celixThreadMutex_lock(&manager->psaListLock);
-	arrayList_removeElement(manager->psaList, psa);
-	celixThreadMutex_unlock(&manager->psaListLock);
-
-	logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Removed PSA");
-
-	return status;
-}
-
-celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_reference_pt reference, void * service) {
-	celix_status_t status = CELIX_SUCCESS;
-	pubsub_topology_manager_pt manager = handle;
-	//subscriber_service_pt subscriber = (subscriber_service_pt)service;
-
-	pubsub_endpoint_pt sub = NULL;
-	if(pubsubEndpoint_createFromServiceReference(reference,&sub,false) == CELIX_SUCCESS){
-		celixThreadMutex_lock(&manager->subscriptionsLock);
-		char *sub_key = createScopeTopicKey(sub->scope, sub->topic);
-
-		array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key);
-		if(sub_list_by_topic==NULL){
-			arrayList_create(&sub_list_by_topic);
-			hashMap_put(manager->subscriptions,strdup(sub_key),sub_list_by_topic);
-		}
-		free(sub_key);
-		arrayList_add(sub_list_by_topic,sub);
-
-		celixThreadMutex_unlock(&manager->subscriptionsLock);
-
-		int j;
-		double score = 0;
-		double best_score = 0;
-		pubsub_admin_service_pt best_psa = NULL;
-		celixThreadMutex_lock(&manager->psaListLock);
-		for(j=0;j<arrayList_size(manager->psaList);j++){
-			pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
-			psa->matchEndpoint(psa->admin,sub,&score);
-			if(score>best_score){ /* We have a new winner! */
-				best_score = score;
-				best_psa = psa;
-			}
-		}
-
-		if(best_psa != NULL && best_score>0){
-			best_psa->addSubscription(best_psa->admin,sub);
-		}
-
-		// Inform discoveries for interest in the topic
-		celixThreadMutex_lock(&manager->discoveryListLock);
-		hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
-		while(hashMapIterator_hasNext(iter)){
-			service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
-			publisher_endpoint_announce_pt disc = NULL;
-			bundleContext_getService(manager->context, disc_sr, (void**) &disc);
-			disc->interestedInTopic(disc->handle, sub->scope, sub->topic);
-			bundleContext_ungetService(manager->context, disc_sr, NULL);
-		}
-		hashMapIterator_destroy(iter);
-		celixThreadMutex_unlock(&manager->discoveryListLock);
-
-		celixThreadMutex_unlock(&manager->psaListLock);
-	}
-	else{
-		status=CELIX_INVALID_BUNDLE_CONTEXT;
-	}
-
-	return status;
-}
-
-celix_status_t pubsub_topologyManager_subscriberModified(void * handle, service_reference_pt reference, void * service) {
-	celix_status_t status = CELIX_SUCCESS;
-
-	// Nop...
-
-	return status;
-}
-
-celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_reference_pt reference, void * service) {
-	celix_status_t status = CELIX_SUCCESS;
-	pubsub_topology_manager_pt manager = handle;
-
-	pubsub_endpoint_pt subcmp = NULL;
-	if(pubsubEndpoint_createFromServiceReference(reference,&subcmp,false) == CELIX_SUCCESS){
-
-		int j,k;
-
-		// Inform discoveries that we not interested in the topic any more
-		celixThreadMutex_lock(&manager->discoveryListLock);
-		hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
-		while(hashMapIterator_hasNext(iter)){
-			service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
-			publisher_endpoint_announce_pt disc = NULL;
-			bundleContext_getService(manager->context, disc_sr, (void**) &disc);
-			disc->uninterestedInTopic(disc->handle, subcmp->scope, subcmp->topic);
-			bundleContext_ungetService(manager->context, disc_sr, NULL);
-		}
-		hashMapIterator_destroy(iter);
-		celixThreadMutex_unlock(&manager->discoveryListLock);
-
-		celixThreadMutex_lock(&manager->subscriptionsLock);
-		celixThreadMutex_lock(&manager->psaListLock);
-
-		char *sub_key = createScopeTopicKey(subcmp->scope,subcmp->topic);
-		array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key);
-		free(sub_key);
-		if(sub_list_by_topic!=NULL){
-			for(j=0;j<arrayList_size(sub_list_by_topic);j++){
-				pubsub_endpoint_pt sub = arrayList_get(sub_list_by_topic,j);
-				if(pubsubEndpoint_equals(sub,subcmp)){
-					for(k=0;k<arrayList_size(manager->psaList);k++){
-						/* No problem with invoking removal on all psa's, only the one that manage this topic will do something */
-						pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-						psa->removeSubscription(psa->admin,sub);
-					}
-
-				}
-				arrayList_remove(sub_list_by_topic,j);
-
-				/* If it was the last subscriber for this topic, tell PSA to close the ZMQ socket */
-				if(arrayList_size(sub_list_by_topic)==0){
-					for(k=0;k<arrayList_size(manager->psaList);k++){
-						pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-						psa->closeAllSubscriptions(psa->admin,sub->scope, sub->topic);
-					}
-				}
-
-				pubsubEndpoint_destroy(sub);
-
-			}
-		}
-
-		celixThreadMutex_unlock(&manager->psaListLock);
-		celixThreadMutex_unlock(&manager->subscriptionsLock);
-
-		pubsubEndpoint_destroy(subcmp);
-
-	}
-	else{
-		status=CELIX_INVALID_BUNDLE_CONTEXT;
-	}
-
-	return status;
-
-}
-
-celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service_reference_pt reference, void* service) {
-	celix_status_t status = CELIX_SUCCESS;
-	pubsub_topology_manager_pt manager = (pubsub_topology_manager_pt)handle;
-	publisher_endpoint_announce_pt disc = (publisher_endpoint_announce_pt)service;
-
-	const char* fwUUID = NULL;
-
-	bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
-	if(fwUUID==NULL){
-		printf("PSD: ERRROR: Cannot retrieve fwUUID.\n");
-		return CELIX_INVALID_BUNDLE_CONTEXT;
-	}
-
-	celixThreadMutex_lock(&manager->publicationsLock);
-
-	celixThreadMutex_lock(&manager->discoveryListLock);
-	hashMap_put(manager->discoveryList, reference, NULL);
-	celixThreadMutex_unlock(&manager->discoveryListLock);
-
-	hash_map_iterator_pt iter = hashMapIterator_create(manager->publications);
-	while(hashMapIterator_hasNext(iter)){
-		array_list_pt pubEP_list = (array_list_pt)hashMapIterator_nextValue(iter);
-		for(int i = 0; i < arrayList_size(pubEP_list); i++) {
-			pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
-			if( (strcmp(pubEP->frameworkUUID,fwUUID)==0) && (pubEP->endpoint!=NULL)){
-				status += disc->announcePublisher(disc->handle,pubEP);
-			}
-		}
-	}
-	hashMapIterator_destroy(iter);
-
-	celixThreadMutex_unlock(&manager->publicationsLock);
-
-	celixThreadMutex_lock(&manager->subscriptionsLock);
-	iter = hashMapIterator_create(manager->subscriptions);
-
-	while(hashMapIterator_hasNext(iter)) {
-		array_list_pt l = (array_list_pt)hashMapIterator_nextValue(iter);
-		int i;
-		for(i=0;i<arrayList_size(l);i++){
-			pubsub_endpoint_pt subEp = (pubsub_endpoint_pt)arrayList_get(l,i);
-
-			disc->interestedInTopic(disc->handle, subEp->scope, subEp->topic);
-		}
-	}
-	hashMapIterator_destroy(iter);
-	celixThreadMutex_unlock(&manager->subscriptionsLock);
-
-	return status;
-}
-
-celix_status_t pubsub_topologyManager_pubsubDiscoveryModified(void * handle, service_reference_pt reference, void * service) {
-	celix_status_t status = CELIX_SUCCESS;
-
-	status = pubsub_topologyManager_pubsubDiscoveryRemoved(handle, reference, service);
-	if (status == CELIX_SUCCESS) {
-		status = pubsub_topologyManager_pubsubDiscoveryAdded(handle, reference, service);
-	}
-
-	return status;
-}
-
-celix_status_t pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, service_reference_pt reference, void * service) {
-	celix_status_t status = CELIX_SUCCESS;
-
-	pubsub_topology_manager_pt manager = handle;
-
-	celixThreadMutex_lock(&manager->discoveryListLock);
-
-
-	if (hashMap_remove(manager->discoveryList, reference)) {
-		logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "EndpointListener Removed");
-	}
-
-	celixThreadMutex_unlock(&manager->discoveryListLock);
-
-	return status;
-}
-
-
-celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_list_pt listeners) {
-
-	celix_status_t status = CELIX_SUCCESS;
-	pubsub_topology_manager_pt manager = handle;
-
-	int l_index;
-
-	for (l_index = 0; l_index < arrayList_size(listeners); l_index++) {
-
-		listener_hook_info_pt info = arrayList_get(listeners, l_index);
-
-		pubsub_endpoint_pt pub = NULL;
-		if(pubsubEndpoint_createFromListenerHookInfo(info, &pub, true) == CELIX_SUCCESS){
-
-			celixThreadMutex_lock(&manager->publicationsLock);
-			char *pub_key = createScopeTopicKey(pub->scope, pub->topic);
-			array_list_pt pub_list_by_topic = hashMap_get(manager->publications, pub_key);
-			if(pub_list_by_topic==NULL){
-				arrayList_create(&pub_list_by_topic);
-				hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic);
-			}
-			free(pub_key);
-			arrayList_add(pub_list_by_topic,pub);
-
-			celixThreadMutex_unlock(&manager->publicationsLock);
-
-			int j;
-			double score = 0;
-			double best_score = 0;
-			pubsub_admin_service_pt best_psa = NULL;
-			celixThreadMutex_lock(&manager->psaListLock);
-
-			for(j=0;j<arrayList_size(manager->psaList);j++){
-				pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
-				psa->matchEndpoint(psa->admin,pub,&score);
-				if(score>best_score){ /* We have a new winner! */
-					best_score = score;
-					best_psa = psa;
-				}
-			}
-
-			if(best_psa != NULL && best_score>0){
-				status = best_psa->addPublication(best_psa->admin,pub);
-				if(status==CELIX_SUCCESS){
-					celixThreadMutex_lock(&manager->discoveryListLock);
-					hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
-					while(hashMapIterator_hasNext(iter)){
-						service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
-						publisher_endpoint_announce_pt disc = NULL;
-						bundleContext_getService(manager->context, disc_sr, (void**) &disc);
-						disc->announcePublisher(disc->handle,pub);
-						bundleContext_ungetService(manager->context, disc_sr, NULL);
-					}
-					hashMapIterator_destroy(iter);
-					celixThreadMutex_unlock(&manager->discoveryListLock);
-				}
-			}
-
-			celixThreadMutex_unlock(&manager->psaListLock);
-
-		}
-
-	}
-
-	return status;
-
-}
-
-
-celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, array_list_pt listeners) {
-	celix_status_t status = CELIX_SUCCESS;
-	pubsub_topology_manager_pt manager = handle;
-
-	int l_index;
-
-	for (l_index = 0; l_index < arrayList_size(listeners); l_index++) {
-
-		listener_hook_info_pt info = arrayList_get(listeners, l_index);
-
-		pubsub_endpoint_pt pubcmp = NULL;
-		if(pubsubEndpoint_createFromListenerHookInfo(info,&pubcmp,true) == CELIX_SUCCESS){
-
-
-			int j,k;
-			celixThreadMutex_lock(&manager->psaListLock);
-			celixThreadMutex_lock(&manager->publicationsLock);
-
-			char *pub_key = createScopeTopicKey(pubcmp->scope, pubcmp->topic);
-			array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
-			if(pub_list_by_topic!=NULL){
-				for(j=0;j<arrayList_size(pub_list_by_topic);j++){
-					pubsub_endpoint_pt pub = arrayList_get(pub_list_by_topic,j);
-					if(pubsubEndpoint_equals(pub,pubcmp)){
-						for(k=0;k<arrayList_size(manager->psaList);k++){
-							pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-							status = psa->removePublication(psa->admin,pub);
-							if(status==CELIX_SUCCESS){ /* We found the one that manages this endpoint */
-								celixThreadMutex_lock(&manager->discoveryListLock);
-								hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
-								while(hashMapIterator_hasNext(iter)){
-									service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter);
-									publisher_endpoint_announce_pt disc = NULL;
-									bundleContext_getService(manager->context, disc_sr, (void**) &disc);
-									disc->removePublisher(disc->handle,pub);
-									bundleContext_ungetService(manager->context, disc_sr, NULL);
-								}
-								hashMapIterator_destroy(iter);
-								celixThreadMutex_unlock(&manager->discoveryListLock);
-							}
-							else if(status ==  CELIX_ILLEGAL_ARGUMENT){ /* Not a real error, just saying this psa does not handle this endpoint */
-								status = CELIX_SUCCESS;
-							}
-						}
-						//}
-						arrayList_remove(pub_list_by_topic,j);
-
-						/* If it was the last publisher for this topic, tell PSA to close the ZMQ socket and then inform the discovery */
-						if(arrayList_size(pub_list_by_topic)==0){
-							for(k=0;k<arrayList_size(manager->psaList);k++){
-								pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-								psa->closeAllPublications(psa->admin,pub->scope, pub->topic);
-							}
-						}
-
-						pubsubEndpoint_destroy(pub);
-					}
-
-				}
-			}
-
-			celixThreadMutex_unlock(&manager->publicationsLock);
-			celixThreadMutex_unlock(&manager->psaListLock);
-
-			free(pub_key);
-
-			pubsubEndpoint_destroy(pubcmp);
-
-		}
-
-	}
-
-	return status;
-}
-
-celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_endpoint_pt pubEP){
-	celix_status_t status = CELIX_SUCCESS;
-	printf("PSTM: New publisher discovered for topic %s [fwUUID=%s, ep=%s]\n",pubEP->topic,pubEP->frameworkUUID,pubEP->endpoint);
-
-	pubsub_topology_manager_pt manager = handle;
-	celixThreadMutex_lock(&manager->psaListLock);
-	celixThreadMutex_lock(&manager->publicationsLock);
-
-	char *pub_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
-
-	array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
-	if(pub_list_by_topic==NULL){
-		arrayList_create(&pub_list_by_topic);
-		hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic);
-	}
-	free(pub_key);
-
-	/* Shouldn't be any other duplicate, since it's filtered out by the discovery */
-	pubsub_endpoint_pt p = NULL;
-	pubsubEndpoint_clone(pubEP, &p);
-	arrayList_add(pub_list_by_topic,p);
-
-	int j;
-	double score = 0;
-	double best_score = 0;
-	pubsub_admin_service_pt best_psa = NULL;
-
-	for(j=0;j<arrayList_size(manager->psaList);j++){
-		pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
-		psa->matchEndpoint(psa->admin,p,&score);
-		if(score>best_score){ /* We have a new winner! */
-			best_score = score;
-			best_psa = psa;
-		}
-	}
-
-	if(best_psa != NULL && best_score>0){
-		best_psa->addPublication(best_psa->admin,p);
-	}
-	else{
-		status = CELIX_ILLEGAL_STATE;
-	}
-
-	celixThreadMutex_unlock(&manager->publicationsLock);
-	celixThreadMutex_unlock(&manager->psaListLock);
-
-	return status;
-}
-
-celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpoint_pt pubEP){
-	celix_status_t status = CELIX_SUCCESS;
-	printf("PSTM: Publisher removed for topic %s [fwUUID=%s, ep=%s]\n",pubEP->topic,pubEP->frameworkUUID,pubEP->endpoint);
-
-	pubsub_topology_manager_pt manager = handle;
-	celixThreadMutex_lock(&manager->psaListLock);
-	celixThreadMutex_lock(&manager->publicationsLock);
-	int i;
-
-	char *pub_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
-	array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key);
-	if(pub_list_by_topic==NULL){
-		printf("PSTM: ERROR: Cannot find topic for known endpoint [%s,%s,%s]. Something is inconsistent.\n",pub_key,pubEP->frameworkUUID,pubEP->endpoint);
-		status = CELIX_ILLEGAL_STATE;
-	}
-	else{
-
-		pubsub_endpoint_pt p = NULL;
-		bool found = false;
-
-		for(i=0;!found && i<arrayList_size(pub_list_by_topic);i++){
-			p = (pubsub_endpoint_pt)arrayList_get(pub_list_by_topic,i);
-			found = pubsubEndpoint_equals(p,pubEP);
-		}
-
-		if(found && p !=NULL){
-
-			for(i=0;i<arrayList_size(manager->psaList);i++){
-				pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
-				/* No problem with invoking removal on all psa's, only the one that manage this topic will do something */
-				psa->removePublication(psa->admin,p);
-			}
-
-			arrayList_removeElement(pub_list_by_topic,p);
-
-			/* If it was the last publisher for this topic, tell PSA to close the ZMQ socket */
-			if(arrayList_size(pub_list_by_topic)==0){
-
-				for(i=0;i<arrayList_size(manager->psaList);i++){
-					pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
-					psa->closeAllPublications(psa->admin,p->scope, p->topic);
-				}
-			}
-
-			pubsubEndpoint_destroy(p);
-		}
-
-
-	}
-	free(pub_key);
-	celixThreadMutex_unlock(&manager->publicationsLock);
-	celixThreadMutex_unlock(&manager->psaListLock);
-
-
-	return status;
-}
-


Mime
View raw message