celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnol...@apache.org
Subject [43/54] [abbrv] celix git commit: Merge branch 'release/2.1.0' into feature/CELIX-417-cmake-refactor
Date Tue, 30 Jan 2018 19:30:27 GMT
http://git-wip-us.apache.org/repos/asf/celix/blob/0ea8de64/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt
----------------------------------------------------------------------
diff --cc pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt
index 858efb5,f2d2aaa..1185ec5
--- a/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt
+++ b/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt
@@@ -15,17 -15,22 +15,17 @@@
  # specific language governing permissions and limitations
  # under the License.
  
- add_bundle(org.apache.celix.pubsub_subscriber.MpSubscriber
 -include_directories("private/include")
 -include_directories("${PROJECT_SOURCE_DIR}/framework/public/include")
 -include_directories("${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include")
 -include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub")
 -include_directories("${PROJECT_SOURCE_DIR}/pubsub/examples/mp_pubsub/common/include")
 -
 -add_celix_bundle( org.apache.celix.pubsub_subscriber.MpSubscriber
++add_celix_bundle(org.apache.celix.pubsub_subscriber.MpSubscriber
      SYMBOLIC_NAME "apache_celix_pubsub_mp_subscriber"
      VERSION "1.0.0"
      SOURCES 
  		private/src/mp_sub_activator.c
  		private/src/mp_subscriber.c
 -		${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
  )
 +target_link_libraries(org.apache.celix.pubsub_subscriber.MpSubscriber PRIVATE Celix::framework Celix::pubsub_api)
 +target_include_directories(org.apache.celix.pubsub_subscriber.MpSubscriber PRIVATE private/include)
  
- bundle_files( org.apache.celix.pubsub_subscriber.MpSubscriber
+ celix_bundle_files( org.apache.celix.pubsub_subscriber.MpSubscriber
      ${PROJECT_SOURCE_DIR}/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor
      ${PROJECT_SOURCE_DIR}/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor
      ${PROJECT_SOURCE_DIR}/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor
@@@ -37,7 -42,9 +37,7 @@@ celix_bundle_files(org.apache.celix.pub
      DESTINATION "META-INF/keys"
  )
  
- bundle_files(org.apache.celix.pubsub_subscriber.MpSubscriber
+ celix_bundle_files(org.apache.celix.pubsub_subscriber.MpSubscriber
 -		${PROJECT_SOURCE_DIR}/pubsub/examples/keys/publisher/public
 +	${PROJECT_SOURCE_DIR}/pubsub/examples/keys/publisher/public
      DESTINATION "META-INF/keys/publisher"
- )
+ )
 -
 -target_link_libraries(org.apache.celix.pubsub_subscriber.MpSubscriber celix_framework celix_utils)

http://git-wip-us.apache.org/repos/asf/celix/blob/0ea8de64/pubsub/examples/pubsub/publisher/CMakeLists.txt
----------------------------------------------------------------------
diff --cc pubsub/examples/pubsub/publisher/CMakeLists.txt
index 43eabd8,dec002d..176a57e
--- a/pubsub/examples/pubsub/publisher/CMakeLists.txt
+++ b/pubsub/examples/pubsub/publisher/CMakeLists.txt
@@@ -15,35 -15,38 +15,35 @@@
  # specific language governing permissions and limitations
  # under the License.
  
- add_bundle(celix_pubsub_poi_publisher
 -include_directories("private/include")
 -include_directories("${PROJECT_SOURCE_DIR}/framework/public/include")
 -include_directories("${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include")
 -include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub")
 -
 -add_celix_bundle(org.apache.celix.pubsub_publisher.PoiPublisher
++add_celix_bundle(celix_pubsub_poi_publisher
      SYMBOLIC_NAME "apache_celix_pubsub_poi_publisher"
      VERSION "1.0.0"
      SOURCES 
      	private/src/ps_pub_activator.c
      	private/src/pubsub_publisher.c
 -    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c	
  )
  
 -celix_bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher
 +target_link_libraries(celix_pubsub_poi_publisher PRIVATE Celix::framework Celix::pubsub_api)
 +target_include_directories(celix_pubsub_poi_publisher PRIVATE private/include)
 +
- bundle_files(celix_pubsub_poi_publisher
++celix_bundle_files(celix_pubsub_poi_publisher
  		${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
  		${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
      DESTINATION "META-INF/descriptors"
  )
  
- bundle_files(celix_pubsub_poi_publisher
 -celix_bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher
++celix_bundle_files(celix_pubsub_poi_publisher
  		${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/poi1.properties
  		${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/poi2.properties
      DESTINATION "META-INF/topics/pub"
  )
  
- bundle_files(celix_pubsub_poi_publisher
 -celix_bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher
++celix_bundle_files(celix_pubsub_poi_publisher
  		${PROJECT_SOURCE_DIR}/pubsub/examples/keys/publisher
      DESTINATION "META-INF/keys"
  )
  
- bundle_files(celix_pubsub_poi_publisher
 -celix_bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher
++celix_bundle_files(celix_pubsub_poi_publisher
  		${PROJECT_SOURCE_DIR}/pubsub/examples/keys/subscriber/public
      DESTINATION "META-INF/keys/subscriber"
  )

http://git-wip-us.apache.org/repos/asf/celix/blob/0ea8de64/pubsub/examples/pubsub/publisher2/CMakeLists.txt
----------------------------------------------------------------------
diff --cc pubsub/examples/pubsub/publisher2/CMakeLists.txt
index ba007c2,1defeb0..30d86ee
--- a/pubsub/examples/pubsub/publisher2/CMakeLists.txt
+++ b/pubsub/examples/pubsub/publisher2/CMakeLists.txt
@@@ -15,36 -15,39 +15,36 @@@
  # specific language governing permissions and limitations
  # under the License.
  
- add_bundle(celix_pubsub_poi_publisher2
 -include_directories("../publisher/private/include")
 -include_directories("${PROJECT_SOURCE_DIR}/framework/public/include")
 -include_directories("${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include")
 -include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub")
 -
 -add_celix_bundle(org.apache.celix.pubsub_publisher.PoiPublisher2
++add_celix_bundle(celix_pubsub_poi_publisher2
      SYMBOLIC_NAME "apache_celix_pubsub_poi_publisher2"
      VERSION "1.0.0"
      SOURCES 
      	../publisher/private/src/ps_pub_activator.c
      	../publisher/private/src/pubsub_publisher.c
 -    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
  )
 +target_link_libraries(celix_pubsub_poi_publisher2 PRIVATE Celix::framework Celix::pubsub_api)
 +target_include_directories(celix_pubsub_poi_publisher2 PRIVATE ../publisher/private/include)
 +
  
- bundle_files(celix_pubsub_poi_publisher2
 -celix_bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher2
++celix_bundle_files(celix_pubsub_poi_publisher2
  	${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
  	${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
      DESTINATION "META-INF/descriptors"
  )
  
- bundle_files(celix_pubsub_poi_publisher2
 -celix_bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher2
++celix_bundle_files(celix_pubsub_poi_publisher2
  		${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/poi1.properties
  		${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/poi2.properties
      DESTINATION "META-INF/topics/pub"
  )
  
- bundle_files(celix_pubsub_poi_publisher2
 -celix_bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher2
++celix_bundle_files(celix_pubsub_poi_publisher2
  		${PROJECT_SOURCE_DIR}/pubsub/examples/keys/publisher
      DESTINATION "META-INF/keys"
  )
  
- bundle_files(celix_pubsub_poi_publisher2
 -celix_bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher2
 -		${PROJECT_SOURCE_DIR}/pubsub/examples/keys/subscriber/public
++celix_bundle_files(celix_pubsub_poi_publisher2
 +	${PROJECT_SOURCE_DIR}/pubsub/examples/keys/subscriber/public
      DESTINATION "META-INF/keys/subscriber"
  )
  

http://git-wip-us.apache.org/repos/asf/celix/blob/0ea8de64/pubsub/examples/pubsub/subscriber/CMakeLists.txt
----------------------------------------------------------------------
diff --cc pubsub/examples/pubsub/subscriber/CMakeLists.txt
index 1ffd39a,8bcec93..0eb13ff
--- a/pubsub/examples/pubsub/subscriber/CMakeLists.txt
+++ b/pubsub/examples/pubsub/subscriber/CMakeLists.txt
@@@ -15,36 -15,41 +15,36 @@@
  # specific language governing permissions and limitations
  # under the License.
  
- add_bundle(celix_pubsub_poi_subscriber
 -include_directories("private/include")
 -include_directories("${PROJECT_SOURCE_DIR}/framework/public/include")
 -include_directories("${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include")
 -include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub")
 -include_directories("${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/common/include")
 -
 -add_celix_bundle(org.apache.celix.pubsub_subscriber.PoiSubscriber
++add_celix_bundle(celix_pubsub_poi_subscriber
      SYMBOLIC_NAME "apache_celix_pubsub_poi_subscriber" 
      VERSION "1.0.0"
      SOURCES 
      	private/src/ps_sub_activator.c
 -    	private/src/pubsub_subscriber.c
 -    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
 +		private/src/pubsub_subscriber.c
  )
  
 -celix_bundle_files(org.apache.celix.pubsub_subscriber.PoiSubscriber
 +target_link_libraries(celix_pubsub_poi_subscriber PRIVATE Celix::framework Celix::pubsub_api)
 +target_include_directories(celix_pubsub_poi_subscriber PRIVATE private/include)
 +
 +
- bundle_files(celix_pubsub_poi_subscriber
++celix_bundle_files(celix_pubsub_poi_subscriber
  	    ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
  	    ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
      DESTINATION "META-INF/descriptors"
  )
  
- bundle_files(celix_pubsub_poi_subscriber
 -celix_bundle_files(org.apache.celix.pubsub_subscriber.PoiSubscriber
++celix_bundle_files(celix_pubsub_poi_subscriber
  		${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/poi1.properties
  		${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/poi2.properties
      DESTINATION "META-INF/topics/sub"
  )
  
- bundle_files(celix_pubsub_poi_subscriber
 -celix_bundle_files(org.apache.celix.pubsub_subscriber.PoiSubscriber
++celix_bundle_files(celix_pubsub_poi_subscriber
  		${PROJECT_SOURCE_DIR}/pubsub/examples/keys/subscriber
      DESTINATION "META-INF/keys"
  )
  
- bundle_files(celix_pubsub_poi_subscriber
 -celix_bundle_files(org.apache.celix.pubsub_subscriber.PoiSubscriber
++celix_bundle_files(celix_pubsub_poi_subscriber
  		${PROJECT_SOURCE_DIR}/pubsub/examples/keys/publisher/public
      DESTINATION "META-INF/keys/publisher"
- )
+ )
 -
 -target_link_libraries(org.apache.celix.pubsub_subscriber.PoiSubscriber celix_framework celix_utils)

http://git-wip-us.apache.org/repos/asf/celix/blob/0ea8de64/pubsub/mock/CMakeLists.txt
----------------------------------------------------------------------
diff --cc pubsub/mock/CMakeLists.txt
index ccc3d20,6313987..e68b1cb
--- a/pubsub/mock/CMakeLists.txt
+++ b/pubsub/mock/CMakeLists.txt
@@@ -18,13 -18,11 +18,13 @@@
  #only install if CppuTest is available
  find_package(CppUTest QUIET)
  if (CPPUTEST_FOUND)
 -    include_directories(
 -        ../api
 -        api
 +
-     add_library(celix_pubsub_mock STATIC
++    add_library(pubsub_mock STATIC
 +            src/publisher_mock.cc
      )
-     target_include_directories(celix_pubsub_mock PUBLIC api)
-     target_link_libraries(celix_pubsub_mock PRIVATE Celix::pubsub_spi ${CPPUTEST_LIBRARY})
-     target_include_directories(celix_pubsub_mock PRIVATE SYSTEM
 -    include_directories(SYSTEM
++    target_include_directories(pubsub_mock PUBLIC api)
++    target_link_libraries(pubsub_mock PRIVATE Celix::pubsub_spi ${CPPUTEST_LIBRARY})
++    target_include_directories(pubsub_mock PRIVATE SYSTEM
          ${CPPUTEST_INCLUDE_DIR}
      )
  

http://git-wip-us.apache.org/repos/asf/celix/blob/0ea8de64/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
----------------------------------------------------------------------
diff --cc pubsub/pubsub_admin_udp_mc/CMakeLists.txt
index 87858e3,fdf43ac..6981c87
--- a/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
+++ b/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
@@@ -17,28 -17,33 +17,28 @@@
  
  find_package(Jansson REQUIRED)
  
- add_bundle(celix_pubsub_admin_udp_multicast
 -include_directories("${PROJECT_SOURCE_DIR}/utils/public/include")
 -include_directories("${PROJECT_SOURCE_DIR}/log_service/public/include")
 -include_directories("${PROJECT_SOURCE_DIR}/dfi/public/include")
 -include_directories("${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include")
 -include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub")
 -include_directories("private/include")
 -include_directories("public/include")
 -include_directories("${JANSSON_INCLUDE_DIR}")
 -
 -add_celix_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc
++add_celix_bundle(celix_pubsub_admin_udp_multicast
  	BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_udp_multicast"
  	VERSION "1.0.0"
  	SOURCES
 -		private/src/psa_activator.c
 -		private/src/pubsub_admin_impl.c
 -		private/src/topic_subscription.c
 -		private/src/topic_publication.c
 -		private/src/large_udp.c
 -	   ${PROJECT_SOURCE_DIR}/log_service/public/src/log_helper.c
 -		${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
 -		${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_admin_match.c
 -		${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
 +        src/psa_activator.c
 +        src/pubsub_admin_impl.c
 +        src/topic_subscription.c
 +        src/topic_publication.c
 +        src/large_udp.c
  )
  
 -set_target_properties(org.apache.celix.pubsub_admin.PubSubAdminUdpMc PROPERTIES INSTALL_RPATH "$ORIGIN")
 -target_link_libraries(org.apache.celix.pubsub_admin.PubSubAdminUdpMc celix_framework celix_utils celix_dfi)
  
 -install_celix_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc)
 +target_include_directories(celix_pubsub_admin_udp_multicast PRIVATE
 +		src
 +		${JANSSON_INCLUDE_DIR}
 +)
 +
 +set_target_properties(celix_pubsub_admin_udp_multicast PROPERTIES INSTALL_RPATH "$ORIGIN")
 +target_link_libraries(celix_pubsub_admin_udp_multicast PRIVATE Celix::pubsub_spi Celix::framework Celix::dfi Celix::log_helper)
 +
- install_bundle(celix_pubsub_admin_udp_multicast)
++install_celix_bundle(celix_pubsub_admin_udp_multicast)
 +
 +add_library(Celix::pubsub_admin_udp_multicast ALIAS celix_pubsub_admin_udp_multicast)
  
  

http://git-wip-us.apache.org/repos/asf/celix/blob/0ea8de64/pubsub/pubsub_admin_zmq/CMakeLists.txt
----------------------------------------------------------------------
diff --cc pubsub/pubsub_admin_zmq/CMakeLists.txt
index 9d57756,962310e..65d2107
--- a/pubsub/pubsub_admin_zmq/CMakeLists.txt
+++ b/pubsub/pubsub_admin_zmq/CMakeLists.txt
@@@ -30,31 -41,23 +30,31 @@@ if (BUILD_PUBSUB_PSA_ZMQ
  		set (ZMQ_CRYPTO_C "private/src/zmq_crypto.c")
  	endif()
  
- 	add_bundle(celix_pubsub_admin_zmq
 -	add_celix_bundle(org.apache.celix.pubsub_admin.PubSubAdminZmq
 -	    BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_zmq"
 -	    VERSION "1.0.0"
 -	    SOURCES
 -	    	private/src/psa_activator.c
 -	    	private/src/pubsub_admin_impl.c
 -	    	private/src/topic_subscription.c
 -	    	private/src/topic_publication.c
 -	    	${ZMQ_CRYPTO_C}
 -	    	${PROJECT_SOURCE_DIR}/log_service/public/src/log_helper.c
 -	    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
 -	    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
 -    	   ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_admin_match.c
++	add_celix_bundle(celix_pubsub_admin_zmq
 +			BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_zmq"
 +			VERSION "1.0.0"
 +			SOURCES
 +			src/psa_activator.c
 +			src/pubsub_admin_impl.c
 +			src/topic_subscription.c
 +			src/topic_publication.c
 +			${ZMQ_CRYPTO_C}
 +	)
 +
 +	set_target_properties(celix_pubsub_admin_zmq PROPERTIES INSTALL_RPATH "$ORIGIN")
 +	target_link_libraries(celix_pubsub_admin_zmq PRIVATE
 +			Celix::pubsub_spi
 +			Celix::framework Celix::dfi Celix::log_helper
 +			${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY}
 +	)
 +	target_include_directories(celix_pubsub_admin_zmq PRIVATE
 +		${ZMQ_INCLUDE_DIR}
 +		${CZMQ_INCLUDE_DIR}
 +		${JANSSON_INCLUDE_DIR}
 +		src
  	)
  
- 	install_bundle(org.apache.celix.pubsub_admin.PubSubAdminZmq)
 -	set_target_properties(org.apache.celix.pubsub_admin.PubSubAdminZmq PROPERTIES INSTALL_RPATH "$ORIGIN")
 -	target_link_libraries(org.apache.celix.pubsub_admin.PubSubAdminZmq celix_framework celix_utils celix_dfi ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY})
 -	install_celix_bundle(org.apache.celix.pubsub_admin.PubSubAdminZmq)
++	install_celix_bundle(celix_pubsub_admin_zmq)
  
 +	add_library(Celix::pubsub_admin_zmq ALIAS celix_pubsub_admin_zmq)
  endif()

http://git-wip-us.apache.org/repos/asf/celix/blob/0ea8de64/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --cc pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
index 29ead0c,0000000..2dcec25
mode 100644,000000..100644
--- a/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
@@@ -1,1040 -1,0 +1,1040 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +/*
 + * pubsub_admin_impl.c
 + *
 + *  \date       Sep 30, 2011
 + *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
 + *  \copyright	Apache License, Version 2.0
 + */
 +
 +#include "pubsub_admin_impl.h"
 +#include <zmq.h>
 +
 +#include <stdio.h>
 +#include <stdlib.h>
 +
 +#include <arpa/inet.h>
 +#include <sys/socket.h>
 +#include <netdb.h>
 +
 +#ifndef ANDROID
 +#include <ifaddrs.h>
 +#endif
 +
 +#include <stdio.h>
 +#include <stdlib.h>
 +#include <unistd.h>
 +#include <string.h>
 +
 +#include "constants.h"
 +#include "utils.h"
 +#include "hash_map.h"
 +#include "array_list.h"
 +#include "bundle_context.h"
 +#include "bundle.h"
 +#include "service_reference.h"
 +#include "service_registration.h"
 +#include "log_helper.h"
 +#include "log_service.h"
 +#include "celix_threads.h"
 +#include "service_factory.h"
 +
 +#include "topic_subscription.h"
 +#include "topic_publication.h"
 +#include "pubsub_endpoint.h"
 +#include "pubsub_utils.h"
- #include "subscriber.h"
++#include "pubsub/subscriber.h"
 +
 +#define MAX_KEY_FOLDER_PATH_LENGTH 512
 +
 +static const char *DEFAULT_IP = "127.0.0.1";
 +
 +static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip);
 +static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
 +static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP);
 +
 +static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc);
 +static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication);
 +static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication);
 +
 +celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin) {
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +#ifdef BUILD_WITH_ZMQ_SECURITY
 +	if (!zsys_has_curve()){
 +		printf("PSA_ZMQ: zeromq curve unsupported\n");
 +		return CELIX_SERVICE_EXCEPTION;
 +	}
 +#endif
 +
 +	*admin = calloc(1, sizeof(**admin));
 +
 +	if (!*admin) {
 +		status = CELIX_ENOMEM;
 +	}
 +	else{
 +
 +		const char *ip = NULL;
 +		char *detectedIp = NULL;
 +		(*admin)->bundle_context= context;
 +		(*admin)->localPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 +		(*admin)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 +		(*admin)->pendingSubscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 +		(*admin)->externalPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 +		(*admin)->topicSubscriptionsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL);
 +		(*admin)->topicPublicationsPerSerializer  = hashMap_create(NULL, NULL, NULL, NULL);
 +		arrayList_create(&((*admin)->noSerializerSubscriptions));
 +		arrayList_create(&((*admin)->noSerializerPublications));
 +		arrayList_create(&((*admin)->serializerList));
 +
 +		celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL);
 +		celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL);
 +		celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL);
 +		celixThreadMutex_create(&(*admin)->serializerListLock, NULL);
 +		celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL);
 +
 +		celixThreadMutexAttr_create(&(*admin)->noSerializerPendingsAttr);
 +		celixThreadMutexAttr_settype(&(*admin)->noSerializerPendingsAttr, CELIX_THREAD_MUTEX_RECURSIVE);
 +		celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, &(*admin)->noSerializerPendingsAttr);
 +
 +		celixThreadMutexAttr_create(&(*admin)->pendingSubscriptionsAttr);
 +		celixThreadMutexAttr_settype(&(*admin)->pendingSubscriptionsAttr, CELIX_THREAD_MUTEX_RECURSIVE);
 +		celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, &(*admin)->pendingSubscriptionsAttr);
 +
 +		if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) {
 +			logHelper_start((*admin)->loghelper);
 +		}
 +
 +		bundleContext_getProperty(context,PSA_IP , &ip);
 +
 +#ifndef ANDROID
 +		if (ip == NULL) {
 +			const char *interface = NULL;
 +
 +			bundleContext_getProperty(context, PSA_ITF, &interface);
 +			if (pubsubAdmin_getIpAdress(interface, &detectedIp) != CELIX_SUCCESS) {
 +				logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: Could not retrieve IP adress for interface %s", interface);
 +			}
 +
 +			ip = detectedIp;
 +		}
 +#endif
 +
 +		if (ip != NULL) {
 +			logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_ZMQ: Using %s for service annunciation", ip);
 +			(*admin)->ipAddress = strdup(ip);
 +		}
 +		else {
 +			logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: No IP address for service annunciation set. Using %s", DEFAULT_IP);
 +			(*admin)->ipAddress = strdup(DEFAULT_IP);
 +		}
 +
 +		if (detectedIp != NULL) {
 +			free(detectedIp);
 +		}
 +
 +		const char* basePortStr = NULL;
 +		const char* maxPortStr = NULL;
 +		char* endptrBase = NULL;
 +		char* endptrMax = NULL;
 +		bundleContext_getPropertyWithDefault(context, PSA_ZMQ_BASE_PORT, "PSA_ZMQ_DEFAULT_BASE_PORT", &basePortStr);
 +		bundleContext_getPropertyWithDefault(context, PSA_ZMQ_MAX_PORT, "PSA_ZMQ_DEFAULT_MAX_PORT", &maxPortStr);
 +		(*admin)->basePort = strtol(basePortStr, &endptrBase, 10);
 +		(*admin)->maxPort = strtol(maxPortStr, &endptrMax, 10);
 +		if (*endptrBase != '\0') {
 +			(*admin)->basePort = PSA_ZMQ_DEFAULT_BASE_PORT;
 +		}
 +		if (*endptrMax != '\0') {
 +			(*admin)->maxPort = PSA_ZMQ_DEFAULT_MAX_PORT;
 +		}
 +
 +		printf("PSA Using base port %u to max port %u\n", (*admin)->basePort, (*admin)->maxPort);
 +
 +		// Disable Signal Handling by CZMQ
 +		setenv("ZSYS_SIGHANDLER", "false", true);
 +
 +		const char *nrZmqThreads = NULL;
 +		bundleContext_getProperty(context, "PSA_NR_ZMQ_THREADS", &nrZmqThreads);
 +
 +		if(nrZmqThreads != NULL) {
 +			char *endPtr = NULL;
 +			unsigned int nrThreads = strtoul(nrZmqThreads, &endPtr, 10);
 +			if(endPtr != nrZmqThreads && nrThreads > 0 && nrThreads < 50) {
 +				zsys_set_io_threads(nrThreads);
 +				logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_ZMQ: Using %d threads for ZMQ", nrThreads);
 +				printf("PSA_ZMQ: Using %d threads for ZMQ\n", nrThreads);
 +			}
 +		}
 +
 +#ifdef BUILD_WITH_ZMQ_SECURITY
 +		// Setup authenticator
 +		zactor_t* auth = zactor_new (zauth, NULL);
 +		zstr_sendx(auth, "VERBOSE", NULL);
 +
 +		// Load all public keys of subscribers into the application
 +		// This step is done for authenticating subscribers
 +		char curve_folder_path[MAX_KEY_FOLDER_PATH_LENGTH];
 +		char* keys_bundle_dir = pubsub_getKeysBundleDir(context);
 +		snprintf(curve_folder_path, MAX_KEY_FOLDER_PATH_LENGTH, "%s/META-INF/keys/subscriber/public", keys_bundle_dir);
 +		zstr_sendx (auth, "CURVE", curve_folder_path, NULL);
 +		free(keys_bundle_dir);
 +
 +		(*admin)->zmq_auth = auth;
 +#endif
 +
 +	}
 +
 +	return status;
 +}
 +
 +
 +celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
 +{
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	free(admin->ipAddress);
 +
 +	celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
 +	hash_map_iterator_pt iter = hashMapIterator_create(admin->pendingSubscriptions);
 +	while(hashMapIterator_hasNext(iter)){
 +		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
 +		free((char*)hashMapEntry_getKey(entry));
 +		arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
 +	}
 +	hashMapIterator_destroy(iter);
 +	hashMap_destroy(admin->pendingSubscriptions,false,false);
 +	celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
 +
 +	celixThreadMutex_lock(&admin->subscriptionsLock);
 +	hashMap_destroy(admin->subscriptions,false,false);
 +	celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +	celixThreadMutex_lock(&admin->localPublicationsLock);
 +	hashMap_destroy(admin->localPublications,true,false);
 +	celixThreadMutex_unlock(&admin->localPublicationsLock);
 +
 +	celixThreadMutex_lock(&admin->externalPublicationsLock);
 +	iter = hashMapIterator_create(admin->externalPublications);
 +	while(hashMapIterator_hasNext(iter)){
 +		hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
 +		free((char*)hashMapEntry_getKey(entry));
 +		arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
 +	}
 +	hashMapIterator_destroy(iter);
 +	hashMap_destroy(admin->externalPublications,false,false);
 +	celixThreadMutex_unlock(&admin->externalPublicationsLock);
 +
 +	celixThreadMutex_lock(&admin->serializerListLock);
 +	arrayList_destroy(admin->serializerList);
 +	celixThreadMutex_unlock(&admin->serializerListLock);
 +
 +	celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +	arrayList_destroy(admin->noSerializerSubscriptions);
 +	arrayList_destroy(admin->noSerializerPublications);
 +	celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +
 +
 +	celixThreadMutex_lock(&admin->usedSerializersLock);
 +
 +	iter = hashMapIterator_create(admin->topicSubscriptionsPerSerializer);
 +	while(hashMapIterator_hasNext(iter)){
 +		arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
 +	}
 +	hashMapIterator_destroy(iter);
 +	hashMap_destroy(admin->topicSubscriptionsPerSerializer,false,false);
 +
 +	iter = hashMapIterator_create(admin->topicPublicationsPerSerializer);
 +	while(hashMapIterator_hasNext(iter)){
 +		arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
 +	}
 +	hashMapIterator_destroy(iter);
 +	hashMap_destroy(admin->topicPublicationsPerSerializer,false,false);
 +
 +	celixThreadMutex_unlock(&admin->usedSerializersLock);
 +
 +	celixThreadMutex_destroy(&admin->usedSerializersLock);
 +	celixThreadMutex_destroy(&admin->serializerListLock);
 +	celixThreadMutex_destroy(&admin->pendingSubscriptionsLock);
 +
 +	celixThreadMutexAttr_destroy(&admin->noSerializerPendingsAttr);
 +	celixThreadMutex_destroy(&admin->noSerializerPendingsLock);
 +
 +	celixThreadMutexAttr_destroy(&admin->pendingSubscriptionsAttr);
 +	celixThreadMutex_destroy(&admin->subscriptionsLock);
 +
 +	celixThreadMutex_destroy(&admin->localPublicationsLock);
 +	celixThreadMutex_destroy(&admin->externalPublicationsLock);
 +
 +	logHelper_stop(admin->loghelper);
 +
 +	logHelper_destroy(&admin->loghelper);
 +
 +#ifdef BUILD_WITH_ZMQ_SECURITY
 +	if (admin->zmq_auth != NULL){
 +		zactor_destroy(&(admin->zmq_auth));
 +	}
 +#endif
 +
 +	free(admin);
 +
 +	return status;
 +}
 +
 +static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	celixThreadMutex_lock(&admin->subscriptionsLock);
 +
 +	topic_subscription_pt any_sub = hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
 +
 +	if(any_sub==NULL){
 +
 +		int i;
 +		pubsub_serializer_service_t *best_serializer = NULL;
 +		if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){
 +			status = pubsub_topicSubscriptionCreate(admin->bundle_context, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, &any_sub);
 +		}
 +		else{
 +			printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",subEP->topic);
 +			celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +			arrayList_add(admin->noSerializerSubscriptions,subEP);
 +			celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +		}
 +
 +		if (status == CELIX_SUCCESS){
 +
 +			/* Connect all internal publishers */
 +			celixThreadMutex_lock(&admin->localPublicationsLock);
 +			hash_map_iterator_pt lp_iter =hashMapIterator_create(admin->localPublications);
 +			while(hashMapIterator_hasNext(lp_iter)){
 +				service_factory_pt factory = (service_factory_pt)hashMapIterator_nextValue(lp_iter);
 +				topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle;
 +				array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs);
 +
 +				if(topic_publishers!=NULL){
 +					for(i=0;i<arrayList_size(topic_publishers);i++){
 +						pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
 +						if(pubEP->endpoint !=NULL){
 +							status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
 +						}
 +					}
 +					arrayList_destroy(topic_publishers);
 +				}
 +			}
 +			hashMapIterator_destroy(lp_iter);
 +			celixThreadMutex_unlock(&admin->localPublicationsLock);
 +
 +			/* Connect also all external publishers */
 +			celixThreadMutex_lock(&admin->externalPublicationsLock);
 +			hash_map_iterator_pt extp_iter =hashMapIterator_create(admin->externalPublications);
 +			while(hashMapIterator_hasNext(extp_iter)){
 +				array_list_pt ext_pub_list = (array_list_pt)hashMapIterator_nextValue(extp_iter);
 +				if(ext_pub_list!=NULL){
 +					for(i=0;i<arrayList_size(ext_pub_list);i++){
 +						pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
 +						if(pubEP->endpoint !=NULL){
 +							status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
 +						}
 +					}
 +				}
 +			}
 +			hashMapIterator_destroy(extp_iter);
 +			celixThreadMutex_unlock(&admin->externalPublicationsLock);
 +
 +
 +			pubsub_topicSubscriptionAddSubscriber(any_sub,subEP);
 +
 +			status += pubsub_topicSubscriptionStart(any_sub);
 +
 +		}
 +
 +		if (status == CELIX_SUCCESS){
 +			hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub);
 +			connectTopicPubSubToSerializer(admin, best_serializer, any_sub, false);
 +		}
 +
 +	}
 +
 +	celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +	return status;
 +}
 +
 +celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	printf("PSA_ZMQ: Received subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope, subEP->topic);
 +
 +	if(strcmp(subEP->topic,PUBSUB_ANY_SUB_TOPIC)==0){
 +		return pubsubAdmin_addAnySubscription(admin,subEP);
 +	}
 +
 +	/* Check if we already know some publisher about this topic, otherwise let's put the subscription in the pending hashmap */
 +	celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
 +	celixThreadMutex_lock(&admin->subscriptionsLock);
 +	celixThreadMutex_lock(&admin->localPublicationsLock);
 +	celixThreadMutex_lock(&admin->externalPublicationsLock);
 +
 +	char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
 +
 +	service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
 +	array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
 +
 +	if(factory==NULL && ext_pub_list==NULL){ //No (local or external) publishers yet for this topic
 +		pubsubAdmin_addSubscriptionToPendingList(admin,subEP);
 +	}
 +	else{
 +		int i;
 +		topic_subscription_pt subscription = hashMap_get(admin->subscriptions, scope_topic);
 +
 +		if(subscription == NULL) {
 +			pubsub_serializer_service_t *best_serializer = NULL;
 +			if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){
 +				status += pubsub_topicSubscriptionCreate(admin->bundle_context,subEP->scope, subEP->topic, best_serializer, &subscription);
 +			}
 +			else{
 +				printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",subEP->topic);
 +				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +				arrayList_add(admin->noSerializerSubscriptions,subEP);
 +				celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +			}
 +
 +			if (status==CELIX_SUCCESS){
 +
 +				/* Try to connect internal publishers */
 +				if(factory!=NULL){
 +					topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle;
 +					array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs);
 +
 +					if(topic_publishers!=NULL){
 +						for(i=0;i<arrayList_size(topic_publishers);i++){
 +							pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
 +							if(pubEP->endpoint !=NULL){
 +								status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
 +							}
 +						}
 +						arrayList_destroy(topic_publishers);
 +					}
 +
 +				}
 +
 +				/* Look also for external publishers */
 +				if(ext_pub_list!=NULL){
 +					for(i=0;i<arrayList_size(ext_pub_list);i++){
 +						pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
 +						if(pubEP->endpoint !=NULL){
 +							status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
 +						}
 +					}
 +				}
 +
 +				pubsub_topicSubscriptionAddSubscriber(subscription,subEP);
 +
 +				status += pubsub_topicSubscriptionStart(subscription);
 +
 +			}
 +
 +			if(status==CELIX_SUCCESS){
 +
 +				hashMap_put(admin->subscriptions,strdup(scope_topic),subscription);
 +
 +				connectTopicPubSubToSerializer(admin, best_serializer, subscription, false);
 +			}
 +		}
 +
 +		if (status == CELIX_SUCCESS){
 +			pubsub_topicIncreaseNrSubscribers(subscription);
 +		}
 +	}
 +
 +	free(scope_topic);
 +	celixThreadMutex_unlock(&admin->externalPublicationsLock);
 +	celixThreadMutex_unlock(&admin->localPublicationsLock);
 +	celixThreadMutex_unlock(&admin->subscriptionsLock);
 +	celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
 +
 +	return status;
 +
 +}
 +
 +celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	printf("PSA_ZMQ: Removing subscription [FWUUID=%s bundleID=%ld topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->topic);
 +
 +	char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
 +
 +	celixThreadMutex_lock(&admin->subscriptionsLock);
 +	topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
 +	if(sub!=NULL){
 +		pubsub_topicDecreaseNrSubscribers(sub);
 +		if(pubsub_topicGetNrSubscribers(sub) == 0) {
 +			status = pubsub_topicSubscriptionRemoveSubscriber(sub,subEP);
 +		}
 +	}
 +	celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +	if(sub==NULL){
 +		/* Maybe the endpoint was pending */
 +		celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +		if(!arrayList_removeElement(admin->noSerializerSubscriptions, subEP)){
 +			status = CELIX_ILLEGAL_STATE;
 +		}
 +		celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +	}
 +
 +	free(scope_topic);
 +
 +
 +
 +	return status;
 +
 +}
 +
 +celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP) {
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	printf("PSA_ZMQ: Received publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n", pubEP->frameworkUUID, pubEP->serviceID, pubEP->scope, pubEP->topic);
 +
 +	const char* fwUUID = NULL;
 +
 +	bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
 +	if (fwUUID == NULL) {
 +		printf("PSA_ZMQ: Cannot retrieve fwUUID.\n");
 +		return CELIX_INVALID_BUNDLE_CONTEXT;
 +	}
 +
 +	char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
 +
 +	if ((strcmp(pubEP->frameworkUUID, fwUUID) == 0) && (pubEP->endpoint == NULL)) {
 +
 +		celixThreadMutex_lock(&admin->localPublicationsLock);
 +
 +		service_factory_pt factory = (service_factory_pt) hashMap_get(admin->localPublications, scope_topic);
 +
 +		if (factory == NULL) {
 +			topic_publication_pt pub = NULL;
 +			pubsub_serializer_service_t *best_serializer = NULL;
 +			if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer)) == CELIX_SUCCESS){
 +				status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, best_serializer, admin->ipAddress, admin->basePort, admin->maxPort, &pub);
 +			}
 +			else{
 +				printf("PSA_ZMQ: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n", pubEP->topic);
 +				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +				arrayList_add(admin->noSerializerPublications,pubEP);
 +				celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +			}
 +
 +			if (status == CELIX_SUCCESS) {
 +				status = pubsub_topicPublicationStart(admin->bundle_context, pub, &factory);
 +				if (status == CELIX_SUCCESS && factory != NULL) {
 +					hashMap_put(admin->localPublications, strdup(scope_topic), factory);
 +					connectTopicPubSubToSerializer(admin, best_serializer, pub, true);
 +				}
 +			} else {
 +				printf("PSA_ZMQ: Cannot create a topicPublication for scope=%s, topic=%s (bundle %ld).\n", pubEP->scope, pubEP->topic, pubEP->serviceID);
 +			}
 +		} else {
 +			//just add the new EP to the list
 +			topic_publication_pt pub = (topic_publication_pt) factory->handle;
 +			pubsub_topicPublicationAddPublisherEP(pub, pubEP);
 +		}
 +
 +		celixThreadMutex_unlock(&admin->localPublicationsLock);
 +	}
 +	else{
 +
 +		celixThreadMutex_lock(&admin->externalPublicationsLock);
 +		array_list_pt ext_pub_list = (array_list_pt) hashMap_get(admin->externalPublications, scope_topic);
 +		if (ext_pub_list == NULL) {
 +			arrayList_create(&ext_pub_list);
 +			hashMap_put(admin->externalPublications, strdup(scope_topic), ext_pub_list);
 +		}
 +
 +		arrayList_add(ext_pub_list, pubEP);
 +
 +		celixThreadMutex_unlock(&admin->externalPublicationsLock);
 +	}
 +
 +	/* Re-evaluate the pending subscriptions */
 +	celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
 +
 +	hash_map_entry_pt pendingSub = hashMap_getEntry(admin->pendingSubscriptions, scope_topic);
 +	if (pendingSub != NULL) { //There were pending subscription for the just published topic. Let's connect them.
 +		char* topic = (char*) hashMapEntry_getKey(pendingSub);
 +		array_list_pt pendingSubList = (array_list_pt) hashMapEntry_getValue(pendingSub);
 +		int i;
 +		for (i = 0; i < arrayList_size(pendingSubList); i++) {
 +			pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) arrayList_get(pendingSubList, i);
 +			pubsubAdmin_addSubscription(admin, subEP);
 +		}
 +		hashMap_remove(admin->pendingSubscriptions, scope_topic);
 +		arrayList_clear(pendingSubList);
 +		arrayList_destroy(pendingSubList);
 +		free(topic);
 +	}
 +
 +	celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
 +
 +	/* Connect the new publisher to the subscription for his topic, if there is any */
 +	celixThreadMutex_lock(&admin->subscriptionsLock);
 +
 +	topic_subscription_pt sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, scope_topic);
 +	if (sub != NULL && pubEP->endpoint != NULL) {
 +		pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, pubEP->endpoint);
 +	}
 +
 +	/* And check also for ANY subscription */
 +	topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
 +	if (any_sub != NULL && pubEP->endpoint != NULL) {
 +		pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, pubEP->endpoint);
 +	}
 +
 +	free(scope_topic);
 +
 +	celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +	return status;
 +
 +}
 +
 +celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){
 +	celix_status_t status = CELIX_SUCCESS;
 +	int count = 0;
 +
 +	printf("PSA_ZMQ: Removing publication [FWUUID=%s bundleID=%ld topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->topic);
 +
 +	const char* fwUUID = NULL;
 +
 +	bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
 +	if(fwUUID==NULL){
 +		printf("PSA_ZMQ: Cannot retrieve fwUUID.\n");
 +		return CELIX_INVALID_BUNDLE_CONTEXT;
 +	}
 +	char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
 +
 +	if(strcmp(pubEP->frameworkUUID,fwUUID)==0){
 +
 +		celixThreadMutex_lock(&admin->localPublicationsLock);
 +		service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
 +		if(factory!=NULL){
 +			topic_publication_pt pub = (topic_publication_pt)factory->handle;
 +			pubsub_topicPublicationRemovePublisherEP(pub,pubEP);
 +		}
 +		celixThreadMutex_unlock(&admin->localPublicationsLock);
 +
 +		if(factory==NULL){
 +			/* Maybe the endpoint was pending */
 +			celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +			if(!arrayList_removeElement(admin->noSerializerPublications, pubEP)){
 +				status = CELIX_ILLEGAL_STATE;
 +			}
 +			celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +		}
 +	}
 +	else{
 +
 +		celixThreadMutex_lock(&admin->externalPublicationsLock);
 +		array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
 +		if(ext_pub_list!=NULL){
 +			int i;
 +			bool found = false;
 +			for(i=0;!found && i<arrayList_size(ext_pub_list);i++){
 +				pubsub_endpoint_pt p  = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
 +				found = pubsubEndpoint_equals(pubEP,p);
 +				if (found){
 +					arrayList_remove(ext_pub_list,i);
 +				}
 +			}
 +			// Check if there are more publishers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic)
 +			for(i=0; i<arrayList_size(ext_pub_list);i++) {
 +				pubsub_endpoint_pt p  = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
 +				if (strcmp(pubEP->endpoint,p->endpoint) == 0) {
 +					count++;
 +				}
 +			}
 +
 +			if(arrayList_size(ext_pub_list)==0){
 +				hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic);
 +				char* topic = (char*)hashMapEntry_getKey(entry);
 +				array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry);
 +				hashMap_remove(admin->externalPublications,topic);
 +				arrayList_destroy(list);
 +				free(topic);
 +			}
 +		}
 +
 +		celixThreadMutex_unlock(&admin->externalPublicationsLock);
 +	}
 +
 +	/* Check if this publisher was connected to one of our subscribers*/
 +	celixThreadMutex_lock(&admin->subscriptionsLock);
 +
 +	topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
 +	if(sub!=NULL && pubEP->endpoint!=NULL && count == 0){
 +		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint);
 +	}
 +
 +	/* And check also for ANY subscription */
 +	topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
 +	if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){
 +		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,pubEP->endpoint);
 +	}
 +
 +	free(scope_topic);
 +	celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +	return status;
 +
 +}
 +
 +celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin, char *scope, char* topic){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	printf("PSA_ZMQ: Closing all publications\n");
 +
 +	celixThreadMutex_lock(&admin->localPublicationsLock);
 +	char *scope_topic = createScopeTopicKey(scope, topic);
 +	hash_map_entry_pt pubsvc_entry = (hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic);
 +	if(pubsvc_entry!=NULL){
 +		char* key = (char*)hashMapEntry_getKey(pubsvc_entry);
 +		service_factory_pt factory= (service_factory_pt)hashMapEntry_getValue(pubsvc_entry);
 +		topic_publication_pt pub = (topic_publication_pt)factory->handle;
 +		status += pubsub_topicPublicationStop(pub);
 +		disconnectTopicPubSubFromSerializer(admin, pub, true);
 +		status += pubsub_topicPublicationDestroy(pub);
 +		hashMap_remove(admin->localPublications,scope_topic);
 +		free(key);
 +		free(factory);
 +	}
 +	free(scope_topic);
 +	celixThreadMutex_unlock(&admin->localPublicationsLock);
 +
 +	return status;
 +
 +}
 +
 +celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope,char* topic){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	printf("PSA_ZMQ: Closing all subscriptions\n");
 +
 +	celixThreadMutex_lock(&admin->subscriptionsLock);
 +	char *scope_topic = createScopeTopicKey(scope, topic);
 +	hash_map_entry_pt sub_entry = (hash_map_entry_pt)hashMap_getEntry(admin->subscriptions,scope_topic);
 +	if(sub_entry!=NULL){
 +		char* topic = (char*)hashMapEntry_getKey(sub_entry);
 +
 +		topic_subscription_pt ts = (topic_subscription_pt)hashMapEntry_getValue(sub_entry);
 +		status += pubsub_topicSubscriptionStop(ts);
 +		disconnectTopicPubSubFromSerializer(admin, ts, false);
 +		status += pubsub_topicSubscriptionDestroy(ts);
 +		hashMap_remove(admin->subscriptions,scope_topic);
 +		free(topic);
 +
 +	}
 +	free(scope_topic);
 +	celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +	return status;
 +
 +}
 +
 +
 +#ifndef ANDROID
 +static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip) {
 +	celix_status_t status = CELIX_BUNDLE_EXCEPTION;
 +
 +	struct ifaddrs *ifaddr, *ifa;
 +	char host[NI_MAXHOST];
 +
 +	if (getifaddrs(&ifaddr) != -1)
 +	{
 +		for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = ifa->ifa_next)
 +		{
 +			if (ifa->ifa_addr == NULL)
 +				continue;
 +
 +			if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) {
 +				if (interface == NULL) {
 +					*ip = strdup(host);
 +					status = CELIX_SUCCESS;
 +				}
 +				else if (strcmp(ifa->ifa_name, interface) == 0) {
 +					*ip = strdup(host);
 +					status = CELIX_SUCCESS;
 +				}
 +			}
 +		}
 +
 +		freeifaddrs(ifaddr);
 +	}
 +
 +	return status;
 +}
 +#endif
 +
 +static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){
 +	celix_status_t status = CELIX_SUCCESS;
 +	char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
 +	array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic);
 +	if(pendingListPerTopic==NULL){
 +		arrayList_create(&pendingListPerTopic);
 +		hashMap_put(admin->pendingSubscriptions,strdup(scope_topic),pendingListPerTopic);
 +	}
 +	arrayList_add(pendingListPerTopic,subEP);
 +	free(scope_topic);
 +	return status;
 +}
 +
 +celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt reference, void * service){
 +	/* Assumption: serializers are all available at startup.
 +	 * If a new (possibly better) serializer is installed and started, already created topic_publications/subscriptions will not be destroyed and recreated */
 +
 +	celix_status_t status = CELIX_SUCCESS;
 +	int i=0;
 +
 +	const char *serType = NULL;
 +	serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
 +	if(serType == NULL){
 +		printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference);
 +		return CELIX_SERVICE_EXCEPTION;
 +	}
 +
 +	pubsub_admin_pt admin = (pubsub_admin_pt)handle;
 +	celixThreadMutex_lock(&admin->serializerListLock);
 +	arrayList_add(admin->serializerList, reference);
 +	celixThreadMutex_unlock(&admin->serializerListLock);
 +
 +	/* Now let's re-evaluate the pending */
 +	celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +
 +	for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){
 +		pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i);
 +		pubsub_serializer_service_t *best_serializer = NULL;
 +		pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
 +		if(best_serializer != NULL){ /* Finally we have a valid serializer! */
 +			pubsubAdmin_addSubscription(admin, ep);
 +		}
 +	}
 +
 +	for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){
 +		pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i);
 +		pubsub_serializer_service_t *best_serializer = NULL;
 +		pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
 +		if(best_serializer != NULL){ /* Finally we have a valid serializer! */
 +			pubsubAdmin_addPublication(admin, ep);
 +		}
 +	}
 +
 +	celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +
 +	printf("PSA_ZMQ: %s serializer added\n",serType);
 +
 +	return status;
 +}
 +
 +celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service){
 +
 +	pubsub_admin_pt admin = (pubsub_admin_pt)handle;
 +	int i=0, j=0;
 +	const char *serType = NULL;
 +
 +	serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType);
 +	if(serType == NULL){
 +		printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference);
 +		return CELIX_SERVICE_EXCEPTION;
 +	}
 +
 +	celixThreadMutex_lock(&admin->serializerListLock);
 +	/* Remove the serializer from the list */
 +	arrayList_removeElement(admin->serializerList, reference);
 +	celixThreadMutex_unlock(&admin->serializerListLock);
 +
 +
 +	celixThreadMutex_lock(&admin->usedSerializersLock);
 +	array_list_pt topicPubList = (array_list_pt)hashMap_remove(admin->topicPublicationsPerSerializer, service);
 +	array_list_pt topicSubList = (array_list_pt)hashMap_remove(admin->topicSubscriptionsPerSerializer, service);
 +	celixThreadMutex_unlock(&admin->usedSerializersLock);
 +
 +	/* Now destroy the topicPublications, but first put back the pubsub_endpoints back to the noSerializer pending list */
 +	if(topicPubList!=NULL){
 +		for(i=0;i<arrayList_size(topicPubList);i++){
 +			topic_publication_pt topicPub = (topic_publication_pt)arrayList_get(topicPubList,i);
 +			/* Stop the topic publication */
 +			pubsub_topicPublicationStop(topicPub);
 +			/* Get the endpoints that are going to be orphan */
 +			array_list_pt pubList = pubsub_topicPublicationGetPublisherList(topicPub);
 +			for(j=0;j<arrayList_size(pubList);j++){
 +				pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubList,j);
 +				/* Remove the publication */
 +				pubsubAdmin_removePublication(admin, pubEP);
 +				/* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */
 +				if(pubEP->endpoint!=NULL){
 +					free(pubEP->endpoint);
 +					pubEP->endpoint = NULL;
 +				}
 +				/* Add the orphan endpoint to the noSerializer pending list */
 +				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +				arrayList_add(admin->noSerializerPublications,pubEP);
 +				celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +			}
 +			arrayList_destroy(pubList);
 +
 +			/* Cleanup also the localPublications hashmap*/
 +			celixThreadMutex_lock(&admin->localPublicationsLock);
 +			hash_map_iterator_pt iter = hashMapIterator_create(admin->localPublications);
 +			char *key = NULL;
 +			service_factory_pt factory = NULL;
 +			while(hashMapIterator_hasNext(iter)){
 +				hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
 +				factory = (service_factory_pt)hashMapEntry_getValue(entry);
 +				topic_publication_pt pub = (topic_publication_pt)factory->handle;
 +				if(pub==topicPub){
 +					key = (char*)hashMapEntry_getKey(entry);
 +					break;
 +				}
 +			}
 +			hashMapIterator_destroy(iter);
 +			if(key!=NULL){
 +				hashMap_remove(admin->localPublications, key);
 +				free(factory);
 +				free(key);
 +			}
 +			celixThreadMutex_unlock(&admin->localPublicationsLock);
 +
 +			/* Finally destroy the topicPublication */
 +			pubsub_topicPublicationDestroy(topicPub);
 +		}
 +		arrayList_destroy(topicPubList);
 +	}
 +
 +	/* Now destroy the topicSubscriptions, but first put back the pubsub_endpoints back to the noSerializer pending list */
 +	if(topicSubList!=NULL){
 +		for(i=0;i<arrayList_size(topicSubList);i++){
 +			topic_subscription_pt topicSub = (topic_subscription_pt)arrayList_get(topicSubList,i);
 +			/* Stop the topic subscription */
 +			pubsub_topicSubscriptionStop(topicSub);
 +			/* Get the endpoints that are going to be orphan */
 +			array_list_pt subList = pubsub_topicSubscriptionGetSubscribersList(topicSub);
 +			for(j=0;j<arrayList_size(subList);j++){
 +				pubsub_endpoint_pt subEP = (pubsub_endpoint_pt)arrayList_get(subList,j);
 +				/* Remove the subscription */
 +				pubsubAdmin_removeSubscription(admin, subEP);
 +				/* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */
 +				if(subEP->endpoint!=NULL){
 +					free(subEP->endpoint);
 +					subEP->endpoint = NULL;
 +				}
 +				/* Add the orphan endpoint to the noSerializer pending list */
 +				celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +				arrayList_add(admin->noSerializerSubscriptions,subEP);
 +				celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +			}
 +
 +			/* Cleanup also the subscriptions hashmap*/
 +			celixThreadMutex_lock(&admin->subscriptionsLock);
 +			hash_map_iterator_pt iter = hashMapIterator_create(admin->subscriptions);
 +			char *key = NULL;
 +			while(hashMapIterator_hasNext(iter)){
 +				hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
 +				topic_subscription_pt sub = (topic_subscription_pt)hashMapEntry_getValue(entry);
 +				if(sub==topicSub){
 +					key = (char*)hashMapEntry_getKey(entry);
 +					break;
 +				}
 +			}
 +			hashMapIterator_destroy(iter);
 +			if(key!=NULL){
 +				hashMap_remove(admin->subscriptions, key);
 +				free(key);
 +			}
 +			celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +			/* Finally destroy the topicSubscription */
 +			pubsub_topicSubscriptionDestroy(topicSub);
 +		}
 +		arrayList_destroy(topicSubList);
 +	}
 +
 +
 +
 +	printf("PSA_ZMQ: %s serializer removed\n",serType);
 +
 +
 +	return CELIX_SUCCESS;
 +}
 +
 +celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	celixThreadMutex_lock(&admin->serializerListLock);
 +	status = pubsub_admin_match(endpoint->topic_props,PUBSUB_ADMIN_TYPE,admin->serializerList,score);
 +	celixThreadMutex_unlock(&admin->serializerListLock);
 +
 +	return status;
 +}
 +
 +/* This one recall the same logic as in the match function */
 +static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc){
 +
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	celixThreadMutex_lock(&admin->serializerListLock);
 +	status = pubsub_admin_get_best_serializer(ep->topic_props, admin->serializerList, serSvc);
 +	celixThreadMutex_unlock(&admin->serializerListLock);
 +
 +	return status;
 +
 +}
 +
 +static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication){
 +
 +	celixThreadMutex_lock(&admin->usedSerializersLock);
 +
 +	hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
 +	array_list_pt list = (array_list_pt)hashMap_get(map,serializer);
 +	if(list==NULL){
 +		arrayList_create(&list);
 +		hashMap_put(map,serializer,list);
 +	}
 +	arrayList_add(list,topicPubSub);
 +
 +	celixThreadMutex_unlock(&admin->usedSerializersLock);
 +
 +}
 +
 +static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication){
 +
 +	celixThreadMutex_lock(&admin->usedSerializersLock);
 +
 +	hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
 +	hash_map_iterator_pt iter = hashMapIterator_create(map);
 +	while(hashMapIterator_hasNext(iter)){
 +		array_list_pt list = (array_list_pt)hashMapIterator_nextValue(iter);
 +		if(arrayList_removeElement(list, topicPubSub)){ //Found it!
 +			break;
 +		}
 +	}
 +	hashMapIterator_destroy(iter);
 +
 +	celixThreadMutex_unlock(&admin->usedSerializersLock);
 +
 +}

http://git-wip-us.apache.org/repos/asf/celix/blob/0ea8de64/pubsub/pubsub_admin_zmq/src/topic_publication.c
----------------------------------------------------------------------
diff --cc pubsub/pubsub_admin_zmq/src/topic_publication.c
index e405866,0000000..b612605
mode 100644,000000..100644
--- a/pubsub/pubsub_admin_zmq/src/topic_publication.c
+++ b/pubsub/pubsub_admin_zmq/src/topic_publication.c
@@@ -1,630 -1,0 +1,630 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +
 +#include <czmq.h>
 +/* The following undefs prevent the collision between:
 + * - sys/syslog.h (which is included within czmq)
 + * - celix/dfi/dfi_log_util.h
 + */
 +#undef LOG_DEBUG
 +#undef LOG_WARNING
 +#undef LOG_INFO
 +#undef LOG_WARNING
 +
 +#include <stdlib.h>
 +#include <string.h>
 +#include <unistd.h>
 +
 +#include "array_list.h"
 +#include "celixbool.h"
 +#include "service_registration.h"
 +#include "utils.h"
 +#include "service_factory.h"
 +#include "version.h"
 +
 +#include "pubsub_common.h"
 +#include "pubsub_utils.h"
- #include "publisher.h"
++#include "pubsub/publisher.h"
 +
 +#include "topic_publication.h"
 +
 +#include "pubsub_serializer.h"
 +
 +#ifdef BUILD_WITH_ZMQ_SECURITY
 +	#include "zmq_crypto.h"
 +
 +	#define MAX_CERT_PATH_LENGTH 512
 +#endif
 +
 +#define EP_ADDRESS_LEN		32
 +#define ZMQ_BIND_MAX_RETRY	5
 +
 +#define FIRST_SEND_DELAY	2
 +
 +struct topic_publication {
 +	zsock_t* zmq_socket;
 +	celix_thread_mutex_t socket_lock; //Protects zmq_socket access
 +	zcert_t * zmq_cert;
 +	char* endpoint;
 +	service_registration_pt svcFactoryReg;
 +	array_list_pt pub_ep_list; //List<pubsub_endpoint>
 +	hash_map_pt boundServices; //<bundle_pt,bound_service>
 +	pubsub_serializer_service_t *serializer;
 +	celix_thread_mutex_t tp_lock;
 +};
 +
 +typedef struct publish_bundle_bound_service {
 +	topic_publication_pt parent;
 +	pubsub_publisher_t service;
 +	bundle_pt bundle;
 +	char *topic;
 +	hash_map_pt msgTypes;
 +	unsigned short getCount;
 +	celix_thread_mutex_t mp_lock; //Protects publish_bundle_bound_service data structure
 +	bool mp_send_in_progress;
 +	array_list_pt mp_parts;
 +}* publish_bundle_bound_service_pt;
 +
 +/* Note: correct locking order is
 + * 1. tp_lock
 + * 2. mp_lock
 + * 3. socket_lock
 + *
 + * tp_lock and socket_lock are independent.
 + */
 +
 +typedef struct pubsub_msg{
 +	pubsub_msg_header_pt header;
 +	char* payload;
 +	int payloadSize;
 +}* pubsub_msg_pt;
 +
 +static unsigned int rand_range(unsigned int min, unsigned int max);
 +
 +static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
 +static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service);
 +
 +static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle);
 +static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc);
 +
 +static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, const void *msg);
 +static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *inMsg, int flags);
 +static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId);
 +
 +static void delay_first_send_for_late_joiners(void);
 +
 +celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +#ifdef BUILD_WITH_ZMQ_SECURITY
 +	char* secure_topics = NULL;
 +	bundleContext_getProperty(bundle_context, "SECURE_TOPICS", (const char **) &secure_topics);
 +
 +	if (secure_topics){
 +		array_list_pt secure_topics_list = pubsub_getTopicsFromString(secure_topics);
 +
 +		int i;
 +		int secure_topics_size = arrayList_size(secure_topics_list);
 +		for (i = 0; i < secure_topics_size; i++){
 +			char* top = arrayList_get(secure_topics_list, i);
 +			if (strcmp(pubEP->topic, top) == 0){
 +				printf("PSA_ZMQ_TP: Secure topic: '%s'\n", top);
 +				pubEP->is_secure = true;
 +			}
 +			free(top);
 +			top = NULL;
 +		}
 +
 +		arrayList_destroy(secure_topics_list);
 +	}
 +
 +	zcert_t* pub_cert = NULL;
 +	if (pubEP->is_secure){
 +		char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
 +		if (keys_bundle_dir == NULL){
 +			return CELIX_SERVICE_EXCEPTION;
 +		}
 +
 +		const char* keys_file_path = NULL;
 +		const char* keys_file_name = NULL;
 +		bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, &keys_file_path);
 +		bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, &keys_file_name);
 +
 +		char cert_path[MAX_CERT_PATH_LENGTH];
 +
 +		//certificate path ".cache/bundle{id}/version0.0/./META-INF/keys/publisher/private/pub_{topic}.key"
 +		snprintf(cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/publisher/private/pub_%s.key.enc", keys_bundle_dir, pubEP->topic);
 +		free(keys_bundle_dir);
 +		printf("PSA_ZMQ_TP: Loading key '%s'\n", cert_path);
 +
 +		pub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, cert_path);
 +		if (pub_cert == NULL){
 +			printf("PSA_ZMQ_TP: Cannot load key '%s'\n", cert_path);
 +			printf("PSA_ZMQ_TP: Topic '%s' NOT SECURED !\n", pubEP->topic);
 +			pubEP->is_secure = false;
 +		}
 +	}
 +#endif
 +
 +	zsock_t* socket = zsock_new (ZMQ_PUB);
 +	if(socket==NULL){
 +		#ifdef BUILD_WITH_ZMQ_SECURITY
 +			if (pubEP->is_secure){
 +				zcert_destroy(&pub_cert);
 +			}
 +		#endif
 +
 +        perror("Error for zmq_socket");
 +		return CELIX_SERVICE_EXCEPTION;
 +	}
 +#ifdef BUILD_WITH_ZMQ_SECURITY
 +	if (pubEP->is_secure){
 +		zcert_apply (pub_cert, socket); // apply certificate to socket
 +		zsock_set_curve_server (socket, true); // setup the publisher's socket to use the curve functions
 +	}
 +#endif
 +
 +	int rv = -1, retry=0;
 +	char* ep = malloc(EP_ADDRESS_LEN);
 +    char bindAddress[EP_ADDRESS_LEN];
 +
 +	while(rv==-1 && retry<ZMQ_BIND_MAX_RETRY){
 +		/* Randomized part due to same bundle publishing on different topics */
 +		unsigned int port = rand_range(basePort,maxPort);
 +		memset(ep,0,EP_ADDRESS_LEN);
 +        memset(bindAddress, 0, EP_ADDRESS_LEN);
 +
 +		snprintf(ep,EP_ADDRESS_LEN,"tcp://%s:%u",bindIP,port);
 +        snprintf(bindAddress, EP_ADDRESS_LEN, "tcp://0.0.0.0:%u", port); //NOTE using a different bind addres than endpoint address
 +		rv = zsock_bind (socket, "%s", bindAddress);
 +        if (rv == -1) {
 +            perror("Error for zmq_bind");
 +        }
 +		retry++;
 +	}
 +
 +	if(rv == -1){
 +		free(ep);
 +		return CELIX_SERVICE_EXCEPTION;
 +	}
 +
 +	/* ZMQ stuffs are all fine at this point. Let's create and initialize the structure */
 +
 +	topic_publication_pt pub = calloc(1,sizeof(*pub));
 +
 +	arrayList_create(&(pub->pub_ep_list));
 +	pub->boundServices = hashMap_create(NULL,NULL,NULL,NULL);
 +	celixThreadMutex_create(&(pub->tp_lock),NULL);
 +
 +	pub->endpoint = ep;
 +	pub->zmq_socket = socket;
 +	pub->serializer = best_serializer;
 +
 +	celixThreadMutex_create(&(pub->socket_lock),NULL);
 +
 +#ifdef BUILD_WITH_ZMQ_SECURITY
 +	if (pubEP->is_secure){
 +		pub->zmq_cert = pub_cert;
 +	}
 +#endif
 +
 +	pubsub_topicPublicationAddPublisherEP(pub,pubEP);
 +
 +	*out = pub;
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	celixThreadMutex_lock(&(pub->tp_lock));
 +
 +	free(pub->endpoint);
 +	arrayList_destroy(pub->pub_ep_list);
 +
 +	hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
 +	while(hashMapIterator_hasNext(iter)){
 +		publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter);
 +		pubsub_destroyPublishBundleBoundService(bound);
 +	}
 +	hashMapIterator_destroy(iter);
 +	hashMap_destroy(pub->boundServices,false,false);
 +
 +	pub->svcFactoryReg = NULL;
 +	pub->serializer = NULL;
 +#ifdef BUILD_WITH_ZMQ_SECURITY
 +	zcert_destroy(&(pub->zmq_cert));
 +#endif
 +
 +	celixThreadMutex_unlock(&(pub->tp_lock));
 +
 +	celixThreadMutex_destroy(&(pub->tp_lock));
 +
 +	celixThreadMutex_lock(&(pub->socket_lock));
 +	zsock_destroy(&(pub->zmq_socket));
 +	celixThreadMutex_unlock(&(pub->socket_lock));
 +
 +	celixThreadMutex_destroy(&(pub->socket_lock));
 +
 +	free(pub);
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory){
 +	celix_status_t status = CELIX_SUCCESS;
 +
 +	/* Let's register the new service */
 +
 +	pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0);
 +
 +	if(pubEP!=NULL){
 +		service_factory_pt factory = calloc(1, sizeof(*factory));
 +		factory->handle = pub;
 +		factory->getService = pubsub_topicPublicationGetService;
 +		factory->ungetService = pubsub_topicPublicationUngetService;
 +
 +		properties_pt props = properties_create();
 +		properties_set(props,PUBSUB_PUBLISHER_TOPIC,pubEP->topic);
 +		properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope);
 +		properties_set(props,"service.version", PUBSUB_PUBLISHER_SERVICE_VERSION);
 +
 +		status = bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg));
 +
 +		if(status != CELIX_SUCCESS){
 +			properties_destroy(props);
 +			printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot register ServiceFactory for topic %s (bundle %ld).\n",pubEP->topic,pubEP->serviceID);
 +		}
 +		else{
 +			*svcFactory = factory;
 +		}
 +	}
 +	else{
 +		printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot find pubsub_endpoint after adding it...Should never happen!\n");
 +		status = CELIX_SERVICE_EXCEPTION;
 +	}
 +
 +	return status;
 +}
 +
 +celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
 +	return serviceRegistration_unregister(pub->svcFactoryReg);
 +}
 +
 +celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){
 +
 +	celixThreadMutex_lock(&(pub->tp_lock));
 +	ep->endpoint = strdup(pub->endpoint);
 +	arrayList_add(pub->pub_ep_list,ep);
 +	celixThreadMutex_unlock(&(pub->tp_lock));
 +
 +	return CELIX_SUCCESS;
 +}
 +
 +celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){
 +
 +	celixThreadMutex_lock(&(pub->tp_lock));
 +	for (int i = 0; i < arrayList_size(pub->pub_ep_list); i++) {
 +	        pubsub_endpoint_pt e = arrayList_get(pub->pub_ep_list, i);
 +	        if(pubsubEndpoint_equals(ep, e)) {
 +	            arrayList_removeElement(pub->pub_ep_list,ep);
 +	            break;
 +	        }
 +	}
 +	celixThreadMutex_unlock(&(pub->tp_lock));
 +
 +	return CELIX_SUCCESS;
 +}
 +
 +array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){
 +	array_list_pt list = NULL;
 +	celixThreadMutex_lock(&(pub->tp_lock));
 +	list = arrayList_clone(pub->pub_ep_list);
 +	celixThreadMutex_unlock(&(pub->tp_lock));
 +	return list;
 +}
 +
 +
 +static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) {
 +	celix_status_t  status = CELIX_SUCCESS;
 +
 +	topic_publication_pt publish = (topic_publication_pt)handle;
 +
 +	celixThreadMutex_lock(&(publish->tp_lock));
 +
 +	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
 +	if(bound==NULL){
 +		bound = pubsub_createPublishBundleBoundService(publish,bundle);
 +		if(bound!=NULL){
 +			hashMap_put(publish->boundServices,bundle,bound);
 +		}
 +	}
 +	else{
 +		bound->getCount++;
 +	}
 +
 +	*service = &bound->service;
 +
 +	celixThreadMutex_unlock(&(publish->tp_lock));
 +
 +	return status;
 +}
 +
 +static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service)  {
 +
 +	topic_publication_pt publish = (topic_publication_pt)handle;
 +
 +	celixThreadMutex_lock(&(publish->tp_lock));
 +
 +	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
 +	if(bound!=NULL){
 +
 +		bound->getCount--;
 +		if(bound->getCount==0){
 +			pubsub_destroyPublishBundleBoundService(bound);
 +			hashMap_remove(publish->boundServices,bundle);
 +		}
 +
 +	}
 +	else{
 +		long bundleId = -1;
 +		bundle_getBundleId(bundle,&bundleId);
 +		printf("PSA_ZMQ_TP: Unexpected ungetService call for bundle %ld.\n", bundleId);
 +	}
 +
 +	/* service should be never used for unget, so let's set the pointer to NULL */
 +	*service = NULL;
 +
 +	celixThreadMutex_unlock(&(publish->tp_lock));
 +
 +	return CELIX_SUCCESS;
 +}
 +
 +static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_pt msg, bool last){
 +
 +	bool ret = true;
 +
 +	zframe_t* headerMsg = zframe_new(msg->header, sizeof(struct pubsub_msg_header));
 +	if (headerMsg == NULL) ret=false;
 +	zframe_t* payloadMsg = zframe_new(msg->payload, msg->payloadSize);
 +	if (payloadMsg == NULL) ret=false;
 +
 +	delay_first_send_for_late_joiners();
 +
 +	if( zframe_send(&headerMsg,zmq_socket, ZFRAME_MORE) == -1) ret=false;
 +
 +	if(!last){
 +		if( zframe_send(&payloadMsg,zmq_socket, ZFRAME_MORE) == -1) ret=false;
 +	}
 +	else{
 +		if( zframe_send(&payloadMsg,zmq_socket, 0) == -1) ret=false;
 +	}
 +
 +	if (!ret){
 +		zframe_destroy(&headerMsg);
 +		zframe_destroy(&payloadMsg);
 +	}
 +
 +	free(msg->header);
 +	free(msg->payload);
 +	free(msg);
 +
 +	return ret;
 +
 +}
 +
 +static bool send_pubsub_mp_msg(zsock_t* zmq_socket, array_list_pt mp_msg_parts){
 +
 +	bool ret = true;
 +
 +	unsigned int i = 0;
 +	unsigned int mp_num = arrayList_size(mp_msg_parts);
 +	for(;i<mp_num;i++){
 +		ret = ret && send_pubsub_msg(zmq_socket, (pubsub_msg_pt)arrayList_get(mp_msg_parts,i), (i==mp_num-1));
 +	}
 +	arrayList_clear(mp_msg_parts);
 +
 +	return ret;
 +
 +}
 +
 +static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg) {
 +
 +	return pubsub_topicPublicationSendMultipart(handle,msgTypeId,msg, PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG);
 +
 +}
 +
 +static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *inMsg, int flags){
 +
 +	int status = 0;
 +
 +	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle;
 +
 +	celixThreadMutex_lock(&(bound->parent->tp_lock));
 +	celixThreadMutex_lock(&(bound->mp_lock));
 +	if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & PUBSUB_PUBLISHER_LAST_MSG) && bound->mp_send_in_progress){ //means a real mp_msg
 +		printf("PSA_ZMQ_TP: Multipart send already in progress. Cannot process a new one.\n");
 +		celixThreadMutex_unlock(&(bound->mp_lock));
 +		celixThreadMutex_unlock(&(bound->parent->tp_lock));
 +		return -3;
 +	}
 +
 +	pubsub_msg_serializer_t* msgSer = (pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, (void*)(uintptr_t)msgTypeId);
 +
 +	if (msgSer!= NULL) {
 +		int major=0, minor=0;
 +
 +		pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header));
 +		strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
 +		msg_hdr->type = msgTypeId;
 +
 +		if (msgSer->msgVersion != NULL){
 +			version_getMajor(msgSer->msgVersion, &major);
 +			version_getMinor(msgSer->msgVersion, &minor);
 +			msg_hdr->major = major;
 +			msg_hdr->minor = minor;
 +		}
 +
 +		void *serializedOutput = NULL;
 +		size_t serializedOutputLen = 0;
 +		msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen);
 +
 +		pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg));
 +		msg->header = msg_hdr;
 +		msg->payload = (char*)serializedOutput;
 +		msg->payloadSize = serializedOutputLen;
 +		bool snd = true;
 +
 +		switch(flags){
 +		case PUBSUB_PUBLISHER_FIRST_MSG:
 +			bound->mp_send_in_progress = true;
 +			arrayList_add(bound->mp_parts,msg);
 +			break;
 +		case PUBSUB_PUBLISHER_PART_MSG:
 +			if(!bound->mp_send_in_progress){
 +				printf("PSA_ZMQ_TP: ERROR: received msg part without the first part.\n");
 +				status = -4;
 +			}
 +			else{
 +				arrayList_add(bound->mp_parts,msg);
 +			}
 +			break;
 +		case PUBSUB_PUBLISHER_LAST_MSG:
 +			if(!bound->mp_send_in_progress){
 +				printf("PSA_ZMQ_TP: ERROR: received end msg without the first part.\n");
 +				status = -4;
 +			}
 +			else{
 +				arrayList_add(bound->mp_parts,msg);
 +				snd = send_pubsub_mp_msg(bound->parent->zmq_socket,bound->mp_parts);
 +				bound->mp_send_in_progress = false;
 +			}
 +			break;
 +		case PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG:	//Normal send case
 +			snd = send_pubsub_msg(bound->parent->zmq_socket,msg,true);
 +			break;
 +		default:
 +			printf("PSA_ZMQ_TP: ERROR: Invalid MP flags combination\n");
 +			status = -4;
 +			break;
 +		}
 +
 +		if(status==-4){
 +			free(msg);
 +		}
 +
 +		if(!snd){
 +			printf("PSA_ZMQ_TP: Failed to send %s message %u.\n",flags == (PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG) ? "single" : "multipart", msgTypeId);
 +		}
 +
 +	} else {
 +        printf("PSA_ZMQ_TP: No msg serializer available for msg type id %d\n", msgTypeId);
 +		status=-1;
 +	}
 +
 +	celixThreadMutex_unlock(&(bound->mp_lock));
 +	celixThreadMutex_unlock(&(bound->parent->tp_lock));
 +
 +	return status;
 +
 +}
 +
 +static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){
 +	*msgTypeId = utils_stringHash(msgType);
 +	return 0;
 +}
 +
 +
 +static unsigned int rand_range(unsigned int min, unsigned int max){
 +
 +	double scaled = (double)(((double)random())/((double)RAND_MAX));
 +	return (max-min+1)*scaled + min;
 +
 +}
 +
 +static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){
 +
 +	//PRECOND lock on tp->lock
 +
 +	publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
 +
 +	if (bound != NULL) {
 +
 +		bound->parent = tp;
 +		bound->bundle = bundle;
 +		bound->getCount = 1;
 +		bound->mp_send_in_progress = false;
 +		celixThreadMutex_create(&bound->mp_lock,NULL);
 +
 +		if(tp->serializer != NULL){
 +			tp->serializer->createSerializerMap(tp->serializer->handle,bundle,&bound->msgTypes);
 +		}
 +
 +		arrayList_create(&bound->mp_parts);
 +
 +		pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
 +		bound->topic=strdup(pubEP->topic);
 +
 +		bound->service.handle = bound;
 +		bound->service.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID;
 +		bound->service.send = pubsub_topicPublicationSend;
 +		bound->service.sendMultipart = pubsub_topicPublicationSendMultipart;
 +
 +	}
 +
 +	return bound;
 +}
 +
 +static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){
 +
 +	//PRECOND lock on tp->lock
 +
 +	celixThreadMutex_lock(&boundSvc->mp_lock);
 +
 +
 +	if(boundSvc->parent->serializer != NULL && boundSvc->msgTypes != NULL){
 +		boundSvc->parent->serializer->destroySerializerMap(boundSvc->parent->serializer->handle, boundSvc->msgTypes);
 +	}
 +
 +	if(boundSvc->mp_parts!=NULL){
 +		arrayList_destroy(boundSvc->mp_parts);
 +	}
 +
 +	if(boundSvc->topic!=NULL){
 +		free(boundSvc->topic);
 +	}
 +
 +	celixThreadMutex_unlock(&boundSvc->mp_lock);
 +	celixThreadMutex_destroy(&boundSvc->mp_lock);
 +
 +	free(boundSvc);
 +
 +}
 +
 +static void delay_first_send_for_late_joiners(){
 +
 +	static bool firstSend = true;
 +
 +	if(firstSend){
 +		printf("PSA_ZMQ_TP: Delaying first send for late joiners...\n");
 +		sleep(FIRST_SEND_DELAY);
 +		firstSend = false;
 +	}
 +}

http://git-wip-us.apache.org/repos/asf/celix/blob/0ea8de64/pubsub/pubsub_admin_zmq/src/topic_publication.h
----------------------------------------------------------------------
diff --cc pubsub/pubsub_admin_zmq/src/topic_publication.h
index 3457263,0000000..65df0e3
mode 100644,000000..100644
--- a/pubsub/pubsub_admin_zmq/src/topic_publication.h
+++ b/pubsub/pubsub_admin_zmq/src/topic_publication.h
@@@ -1,49 -1,0 +1,49 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +/*
 + * topic_publication.h
 + *
 + *  \date       Sep 24, 2015
 + *  \author    	<a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
 + *  \copyright	Apache License, Version 2.0
 + */
 +
 +#ifndef TOPIC_PUBLICATION_H_
 +#define TOPIC_PUBLICATION_H_
 +
- #include "publisher.h"
++#include "pubsub/publisher.h"
 +#include "pubsub_endpoint.h"
 +#include "pubsub_common.h"
 +
 +#include "pubsub_serializer.h"
 +
 +typedef struct topic_publication *topic_publication_pt;
 +
 +celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out);
 +celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
 +
 +celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
 +celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep);
 +
 +celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
 +celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);
 +
 +array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub);
 +
 +#endif /* TOPIC_PUBLICATION_H_ */


Mime
View raw message