celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From griccia...@apache.org
Subject celix git commit: Fixed some Coverity issues
Date Mon, 02 Oct 2017 12:59:31 GMT
Repository: celix
Updated Branches:
  refs/heads/develop 76882849a -> 6818c4f57


Fixed some Coverity issues


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/6818c4f5
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/6818c4f5
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/6818c4f5

Branch: refs/heads/develop
Commit: 6818c4f57ebd95b1b9eba478254da2ec559a91c4
Parents: 7688284
Author: gricciardi <gricciardi@apache.org>
Authored: Mon Oct 2 14:59:14 2017 +0200
Committer: gricciardi <gricciardi@apache.org>
Committed: Mon Oct 2 14:59:14 2017 +0200

----------------------------------------------------------------------
 .gitignore                                      |   2 +
 pubsub/pubsub_admin_udp_mc/CMakeLists.txt       |   2 +-
 .../private/include/pubsub_admin_impl.h         |   1 +
 .../private/src/pubsub_admin_impl.c             | 182 +++++++++-------
 .../private/src/topic_subscription.c            |  30 ++-
 pubsub/pubsub_admin_zmq/CMakeLists.txt          |   2 +-
 .../private/include/pubsub_admin_impl.h         |   1 +
 .../private/src/pubsub_admin_impl.c             |  14 +-
 .../private/src/topic_publication.c             |   7 +-
 .../private/src/topic_subscription.c            |   8 +-
 pubsub/pubsub_common/public/src/log_helper.c    | 209 -------------------
 pubsub/pubsub_serializer_json/CMakeLists.txt    |   2 +-
 pubsub/pubsub_topology_manager/CMakeLists.txt   |   2 +-
 .../private/src/pubsub_topology_manager.c       |   4 +-
 14 files changed, 159 insertions(+), 307 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index cb42137..0c6fae6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -25,3 +25,5 @@ cmake-build*
 .idea
 nbproject
 *.pyc
+*.enc
+*.pub

http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/CMakeLists.txt b/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
index 7be1ee9..9e3f063 100644
--- a/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
+++ b/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
@@ -35,8 +35,8 @@ add_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc
     	private/src/topic_subscription.c
     	private/src/topic_publication.c
     	private/src/large_udp.c
+	   ${PROJECT_SOURCE_DIR}/log_service/public/src/log_helper.c
     	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
-    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
     	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_admin_match.c
     	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
 )

http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
index 731b037..7696722 100644
--- a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
+++ b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
@@ -51,6 +51,7 @@ struct pubsub_admin {
 	hash_map_pt subscriptions; //<topic(string),topic_subscription>
 
 	celix_thread_mutex_t pendingSubscriptionsLock;
+	celix_thread_mutexattr_t pendingSubscriptionsAttr;
 	hash_map_pt pendingSubscriptions; //<topic(string),List<pubsub_ep>>
 
 	/* Those are used to keep track of valid subscriptions/publications that still have no valid
serializer */

http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
index 5a651da..c3136f9 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
@@ -79,103 +79,128 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt
*ad
 	*admin = calloc(1, sizeof(**admin));
 
 	if (!*admin) {
-		status = CELIX_ENOMEM;
+		return CELIX_ENOMEM;
+	}
+
+	char *mc_ip = NULL;
+	char *if_ip = NULL;
+	int sendSocket = -1;
+
+	if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) {
+		logHelper_start((*admin)->loghelper);
+	}
+	const char *mc_ip_prop = NULL;
+	bundleContext_getProperty(context,PSA_IP , &mc_ip_prop);
+	if(mc_ip_prop) {
+		mc_ip = strdup(mc_ip_prop);
 	}
-	else{
 
-		char *mc_ip = NULL;
-		char *if_ip = NULL;
-		(*admin)->bundle_context= context;
-		(*admin)->localPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals,
NULL);
-		(*admin)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals,
NULL);
-		(*admin)->pendingSubscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals,
NULL);
-		(*admin)->externalPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals,
NULL);
-		(*admin)->topicSubscriptionsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL);
-		(*admin)->topicPublicationsPerSerializer  = hashMap_create(NULL, NULL, NULL, NULL);
-		arrayList_create(&((*admin)->noSerializerSubscriptions));
-		arrayList_create(&((*admin)->noSerializerPublications));
-		arrayList_create(&((*admin)->serializerList));
-
-		celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL);
-		celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL);
-		celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, NULL);
-		celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL);
-		celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, NULL);
-		celixThreadMutex_create(&(*admin)->serializerListLock, NULL);
-		celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL);
-
-		if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) {
-			logHelper_start((*admin)->loghelper);
+#ifndef ANDROID
+	if (mc_ip == NULL) {
+		const char *mc_prefix = NULL;
+		const char *interface = NULL;
+		int b0 = 0, b1 = 0, b2 = 0, b3 = 0;
+		bundleContext_getProperty(context,PSA_MULTICAST_IP_PREFIX , &mc_prefix);
+		if(mc_prefix == NULL) {
+			mc_prefix = DEFAULT_MC_PREFIX;
 		}
-		const char *mc_ip_prop = NULL;
-		bundleContext_getProperty(context,PSA_IP , &mc_ip_prop);
-		if(mc_ip_prop) {
-			mc_ip = strdup(mc_ip_prop);
+
+		bundleContext_getProperty(context, PSA_ITF, &interface);
+		if (pubsubAdmin_getIpAddress(interface, &if_ip) != CELIX_SUCCESS) {
+			logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: Could not
retrieve IP address for interface %s", interface);
 		}
-#ifndef ANDROID
-		if (mc_ip == NULL) {
-			const char *mc_prefix = NULL;
-			const char *interface = NULL;
-			int b0 = 0, b1 = 0, b2 = 0, b3 = 0;
-			bundleContext_getProperty(context,PSA_MULTICAST_IP_PREFIX , &mc_prefix);
-			if(mc_prefix == NULL) {
-				mc_prefix = DEFAULT_MC_PREFIX;
-			}
 
-			bundleContext_getProperty(context, PSA_ITF, &interface);
-			if (pubsubAdmin_getIpAddress(interface, &if_ip) != CELIX_SUCCESS) {
-				logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: Could not
retrieve IP address for interface %s", interface);
-			}
+		printf("IP Detected : %s\n", if_ip);
+		if(if_ip && sscanf(if_ip, "%i.%i.%i.%i", &b0, &b1, &b2, &b3) !=
4) {
+			logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: Could not
parse IP address %s", if_ip);
+			b2 = 1;
+			b3 = 1;
+		}
 
-			printf("IP Detected : %s\n", if_ip);
-			if(if_ip && sscanf(if_ip, "%i.%i.%i.%i", &b0, &b1, &b2, &b3) !=
4) {
-				logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: Could not
parse IP address %s", if_ip);
-				b2 = 1;
-				b3 = 1;
-			}
+		asprintf(&mc_ip, "%s.%d.%d",mc_prefix, b2, b3);
 
-			asprintf(&mc_ip, "%s.%d.%d",mc_prefix, b2, b3);
+		sendSocket = socket(AF_INET, SOCK_DGRAM, 0);
+		if(sendSocket == -1) {
+			perror("pubsubAdmin_create:socket");
+			status = CELIX_SERVICE_EXCEPTION;
+		}
 
-			int sendSocket = socket(AF_INET, SOCK_DGRAM, 0);
-			if(sendSocket == -1) {
-				perror("pubsubAdmin_create:socket");
-				return CELIX_SERVICE_EXCEPTION;
-			}
+		if(status == CELIX_SUCCESS){
 			char loop = 1;
 			if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)) != 0)
{
 				perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_LOOP)");
-				close(sendSocket);
-				return CELIX_SERVICE_EXCEPTION;
+				status = CELIX_SERVICE_EXCEPTION;
 			}
+		}
 
+		if(status == CELIX_SUCCESS){
 			struct in_addr multicast_interface;
 			inet_aton(if_ip, &multicast_interface);
 			if(setsockopt(sendSocket,  IPPROTO_IP, IP_MULTICAST_IF, &multicast_interface, sizeof(multicast_interface))
!= 0) {
 				perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_IF)");
-				close(sendSocket);
-				return CELIX_SERVICE_EXCEPTION;
+				status = CELIX_SERVICE_EXCEPTION;
 			}
+		}
 
-			(*admin)->sendSocket = sendSocket;
+	}
 
-		}
-#endif
-		if (if_ip != NULL) {
-			logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_UDP_MC: Using %s as interface
for multicast communication", if_ip);
-			(*admin)->ifIpAddress = if_ip;
-		} else {
-			(*admin)->ifIpAddress = strdup("127.0.0.1");
-		}
 
-		if (mc_ip != NULL) {
-			logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_UDP_MC: Using %s for
service annunciation", mc_ip);
-			(*admin)->mcIpAddress = mc_ip;
+	if(status != CELIX_SUCCESS){
+		logHelper_stop((*admin)->loghelper);
+		logHelper_destroy(&((*admin)->loghelper));
+		if(sendSocket >=0){
+			close(sendSocket);
+		}
+		if(if_ip != NULL){
+			free(if_ip);
 		}
-		else {
-			logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: No IP address
for service annunciation set. Using %s", DEFAULT_MC_IP);
-			(*admin)->mcIpAddress = strdup(DEFAULT_MC_IP);
+		if(mc_ip != NULL){
+			free(mc_ip);
 		}
+		return status;
+	}
+	else{
+		(*admin)->sendSocket = sendSocket;
+	}
+
+#endif
+
+	(*admin)->bundle_context= context;
+	(*admin)->localPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals,
NULL);
+	(*admin)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals,
NULL);
+	(*admin)->pendingSubscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals,
NULL);
+	(*admin)->externalPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals,
NULL);
+	(*admin)->topicSubscriptionsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL);
+	(*admin)->topicPublicationsPerSerializer  = hashMap_create(NULL, NULL, NULL, NULL);
+	arrayList_create(&((*admin)->noSerializerSubscriptions));
+	arrayList_create(&((*admin)->noSerializerPublications));
+	arrayList_create(&((*admin)->serializerList));
+
+	celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL);
+	celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL);
+	celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL);
+	celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, NULL);
+	celixThreadMutex_create(&(*admin)->serializerListLock, NULL);
+	celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL);
+
+	celixThreadMutexAttr_create(&(*admin)->pendingSubscriptionsAttr);
+	celixThreadMutexAttr_settype(&(*admin)->pendingSubscriptionsAttr, CELIX_THREAD_MUTEX_RECURSIVE);
+	celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, &(*admin)->pendingSubscriptionsAttr);
+
+	if (if_ip != NULL) {
+		logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_UDP_MC: Using %s as interface
for multicast communication", if_ip);
+		(*admin)->ifIpAddress = if_ip;
+	} else {
+		(*admin)->ifIpAddress = strdup("127.0.0.1");
+	}
 
+	if (mc_ip != NULL) {
+		logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_UDP_MC: Using %s for service
annunciation", mc_ip);
+		(*admin)->mcIpAddress = mc_ip;
+	}
+	else {
+		logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: No IP address
for service annunciation set. Using %s", DEFAULT_MC_IP);
+		(*admin)->mcIpAddress = strdup(DEFAULT_MC_IP);
 	}
 
 	return status;
@@ -249,7 +274,10 @@ celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
 	celixThreadMutex_destroy(&admin->usedSerializersLock);
 	celixThreadMutex_destroy(&admin->noSerializerPendingsLock);
 	celixThreadMutex_destroy(&admin->serializerListLock);
+
 	celixThreadMutex_destroy(&admin->pendingSubscriptionsLock);
+	celixThreadMutexAttr_destroy(&admin->pendingSubscriptionsAttr);
+
 	celixThreadMutex_destroy(&admin->subscriptionsLock);
 	celixThreadMutex_destroy(&admin->localPublicationsLock);
 	celixThreadMutex_destroy(&admin->externalPublicationsLock);
@@ -353,6 +381,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 	}
 
 	/* Check if we already know some publisher about this topic, otherwise let's put the subscription
in the pending hashmap */
+	celixThreadMutex_lock(&admin->subscriptionsLock);
 	celixThreadMutex_lock(&admin->localPublicationsLock);
 	celixThreadMutex_lock(&admin->externalPublicationsLock);
 
@@ -418,9 +447,9 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 			}
 
 			if(status==CELIX_SUCCESS){
-				celixThreadMutex_lock(&admin->subscriptionsLock);
+
 				hashMap_put(admin->subscriptions,strdup(scope_topic),subscription);
-				celixThreadMutex_unlock(&admin->subscriptionsLock);
+
 				connectTopicPubSubToSerializer(admin, best_serializer, subscription, false);
 			}
 		}
@@ -433,6 +462,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 	free(scope_topic);
 	celixThreadMutex_unlock(&admin->externalPublicationsLock);
 	celixThreadMutex_unlock(&admin->localPublicationsLock);
+	celixThreadMutex_unlock(&admin->subscriptionsLock);
 
 	return status;
 
@@ -658,7 +688,7 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi
 	/* And check also for ANY subscription */
 	topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
 	if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){
-		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint);
+		pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,pubEP->endpoint);
 	}
 
 	free(scope_topic);

http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
index 9bf0f80..d8e6f45 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
@@ -67,6 +67,7 @@ struct topic_subscription{
 	int topicEpollFd; // EPOLL filedescriptor where the sockets are registered.
 	hash_map_pt servicesMap; // key = service, value = msg types map
 	hash_map_pt socketMap; // key = URL, value = listen-socket
+	celix_thread_mutex_t socketMap_lock;
 
 	celix_thread_mutex_t pendingConnections_lock;
 	array_list_pt pendingConnections;
@@ -122,6 +123,7 @@ celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,
 	arrayList_create(&ts->pendingDisconnections);
 	celixThreadMutex_create(&ts->pendingConnections_lock, NULL);
 	celixThreadMutex_create(&ts->pendingDisconnections_lock, NULL);
+	celixThreadMutex_create(&ts->socketMap_lock, NULL);
 
 	ts->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS);
 
@@ -170,7 +172,10 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt
ts){
 	arrayList_destroy(ts->sub_ep_list);
 	hashMap_destroy(ts->servicesMap,false,false);
 
+	celixThreadMutex_lock(&ts->socketMap_lock);
 	hashMap_destroy(ts->socketMap,true,true);
+	celixThreadMutex_unlock(&ts->socketMap_lock);
+	celixThreadMutex_destroy(&ts->socketMap_lock);
 
 	celixThreadMutex_lock(&ts->pendingConnections_lock);
 	arrayList_destroy(ts->pendingConnections);
@@ -214,6 +219,8 @@ celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts){
 
 celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){
 	celix_status_t status = CELIX_SUCCESS;
+	struct epoll_event ev;
+	memset(&ev, 0, sizeof(ev));
 
 	ts->running = false;
 
@@ -223,14 +230,25 @@ celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){
 
 	status = serviceTracker_close(ts->tracker);
 
+	celixThreadMutex_lock(&ts->socketMap_lock);
 	hash_map_iterator_pt it = hashMapIterator_create(ts->socketMap);
 	while(hashMapIterator_hasNext(it)) {
 		hash_map_entry_pt entry = hashMapIterator_nextEntry(it);
 		char *url = hashMapEntry_getKey(entry);
-		pubsub_topicSubscriptionDisconnectPublisher(ts, url);
+		int *s = hashMapEntry_getValue(entry);
+		memset(&ev, 0, sizeof(ev));
+		if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_DEL, *s, &ev) == -1) {
+			printf("in if error()\n");
+			perror("epoll_ctl() EPOLL_CTL_DEL");
+			status += CELIX_SERVICE_EXCEPTION;
+		}
+		free(s);
 		free(url);
+		//hashMapIterator_remove(it);
 	}
 	hashMapIterator_destroy(it);
+	hashMap_clear(ts->socketMap, false, false);
+	celixThreadMutex_unlock(&ts->socketMap_lock);
 
 
 	return status;
@@ -241,7 +259,8 @@ celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt
ts
 	printf("pubsub_topicSubscriptionConnectPublisher : pubURL = %s\n", pubURL);
 
 	celix_status_t status = CELIX_SUCCESS;
-	celixThreadMutex_lock(&ts->ts_lock);
+
+	celixThreadMutex_lock(&ts->socketMap_lock);
 
 	if(!hashMap_containsKey(ts->socketMap, pubURL)){
 
@@ -319,7 +338,8 @@ celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt
ts
 			free(recvSocket);
 		}
 	}
-	celixThreadMutex_unlock(&ts->ts_lock);
+
+	celixThreadMutex_unlock(&ts->socketMap_lock);
 
 	return status;
 }
@@ -348,7 +368,7 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
 	struct epoll_event ev;
 	memset(&ev, 0, sizeof(ev));
 
-	celixThreadMutex_lock(&ts->ts_lock);
+	celixThreadMutex_lock(&ts->socketMap_lock);
 
 	if (hashMap_containsKey(ts->socketMap, pubURL)){
 
@@ -366,7 +386,7 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
 
 	}
 
-	celixThreadMutex_unlock(&ts->ts_lock);
+	celixThreadMutex_unlock(&ts->socketMap_lock);
 
 	return status;
 }

http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_admin_zmq/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/CMakeLists.txt b/pubsub/pubsub_admin_zmq/CMakeLists.txt
index 8c3c727..ab250f9 100644
--- a/pubsub/pubsub_admin_zmq/CMakeLists.txt
+++ b/pubsub/pubsub_admin_zmq/CMakeLists.txt
@@ -50,8 +50,8 @@ if (BUILD_PUBSUB_PSA_ZMQ)
 	    	private/src/topic_subscription.c
 	    	private/src/topic_publication.c
 	    	${ZMQ_CRYPTO_C}
+	    	${PROJECT_SOURCE_DIR}/log_service/public/src/log_helper.c
 	    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
-	    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
 	    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
     	   ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_admin_match.c
 	)

http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
index 3a39a93..a18e9cf 100644
--- a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
+++ b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
@@ -68,6 +68,7 @@ struct pubsub_admin {
 	hash_map_pt subscriptions; //<topic(string),topic_subscription>
 
 	celix_thread_mutex_t pendingSubscriptionsLock;
+	celix_thread_mutexattr_t pendingSubscriptionsAttr;
 	hash_map_pt pendingSubscriptions; //<topic(string),List<pubsub_ep>>
 
 	/* Those are used to keep track of valid subscriptions/publications that still have no valid
serializer */

http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
index 522b2a5..efba13a 100644
--- a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
@@ -106,12 +106,15 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt
*ad
 
 		celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL);
 		celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL);
-		celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, NULL);
 		celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL);
 		celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, NULL);
 		celixThreadMutex_create(&(*admin)->serializerListLock, NULL);
 		celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL);
 
+		celixThreadMutexAttr_create(&(*admin)->pendingSubscriptionsAttr);
+		celixThreadMutexAttr_settype(&(*admin)->pendingSubscriptionsAttr, CELIX_THREAD_MUTEX_RECURSIVE);
+		celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, &(*admin)->pendingSubscriptionsAttr);
+
 		if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) {
 			logHelper_start((*admin)->loghelper);
 		}
@@ -267,7 +270,10 @@ celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
 	celixThreadMutex_destroy(&admin->noSerializerPendingsLock);
 	celixThreadMutex_destroy(&admin->serializerListLock);
 	celixThreadMutex_destroy(&admin->pendingSubscriptionsLock);
+
+	celixThreadMutexAttr_destroy(&admin->pendingSubscriptionsAttr);
 	celixThreadMutex_destroy(&admin->subscriptionsLock);
+
 	celixThreadMutex_destroy(&admin->localPublicationsLock);
 	celixThreadMutex_destroy(&admin->externalPublicationsLock);
 
@@ -376,6 +382,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 	}
 
 	/* Check if we already know some publisher about this topic, otherwise let's put the subscription
in the pending hashmap */
+	celixThreadMutex_lock(&admin->subscriptionsLock);
 	celixThreadMutex_lock(&admin->localPublicationsLock);
 	celixThreadMutex_lock(&admin->externalPublicationsLock);
 
@@ -441,9 +448,9 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 			}
 
 			if(status==CELIX_SUCCESS){
-				celixThreadMutex_lock(&admin->subscriptionsLock);
+
 				hashMap_put(admin->subscriptions,strdup(scope_topic),subscription);
-				celixThreadMutex_unlock(&admin->subscriptionsLock);
+
 				connectTopicPubSubToSerializer(admin, best_serializer, subscription, false);
 			}
 		}
@@ -456,6 +463,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint
 	free(scope_topic);
 	celixThreadMutex_unlock(&admin->externalPublicationsLock);
 	celixThreadMutex_unlock(&admin->localPublicationsLock);
+	celixThreadMutex_unlock(&admin->subscriptionsLock);
 
 	return status;
 

http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
index b741771..2adb1af 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
@@ -464,10 +464,12 @@ static int pubsub_topicPublicationSendMultipart(void *handle, unsigned
int msgTy
 
 	publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle;
 
+	celixThreadMutex_lock(&(bound->parent->tp_lock));
 	celixThreadMutex_lock(&(bound->mp_lock));
 	if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & PUBSUB_PUBLISHER_LAST_MSG)
&& bound->mp_send_in_progress){ //means a real mp_msg
 		printf("PSA_ZMQ_TP: Multipart send already in progress. Cannot process a new one.\n");
 		celixThreadMutex_unlock(&(bound->mp_lock));
+		celixThreadMutex_unlock(&(bound->parent->tp_lock));
 		return -3;
 	}
 
@@ -518,16 +520,12 @@ static int pubsub_topicPublicationSendMultipart(void *handle, unsigned
int msgTy
 			}
 			else{
 				arrayList_add(bound->mp_parts,msg);
-				celixThreadMutex_lock(&(bound->parent->tp_lock));
 				snd = send_pubsub_mp_msg(bound->parent->zmq_socket,bound->mp_parts);
 				bound->mp_send_in_progress = false;
-				celixThreadMutex_unlock(&(bound->parent->tp_lock));
 			}
 			break;
 		case PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG:	//Normal send case
-			celixThreadMutex_lock(&(bound->parent->tp_lock));
 			snd = send_pubsub_msg(bound->parent->zmq_socket,msg,true);
-			celixThreadMutex_unlock(&(bound->parent->tp_lock));
 			break;
 		default:
 			printf("PSA_ZMQ_TP: ERROR: Invalid MP flags combination\n");
@@ -549,6 +547,7 @@ static int pubsub_topicPublicationSendMultipart(void *handle, unsigned
int msgTy
 	}
 
 	celixThreadMutex_unlock(&(bound->mp_lock));
+	celixThreadMutex_unlock(&(bound->parent->tp_lock));
 
 	return status;
 

http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
index cf51ed9..3c4e2a0 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
@@ -248,6 +248,8 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
 	celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
 	celixThreadMutex_destroy(&ts->pendingDisconnections_lock);
 
+	celixThreadMutex_unlock(&ts->ts_lock);
+
 	celixThreadMutex_lock(&ts->socket_lock);
 	zsock_destroy(&(ts->zmq_socket));
 #ifdef BUILD_WITH_ZMQ_SECURITY
@@ -257,8 +259,7 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
 	celixThreadMutex_unlock(&ts->socket_lock);
 	celixThreadMutex_destroy(&ts->socket_lock);
 
-	celixThreadMutex_unlock(&ts->ts_lock);
-
+	celixThreadMutex_destroy(&ts->ts_lock);
 
 	free(ts);
 
@@ -515,7 +516,8 @@ static void* zmq_recv_thread_func(void * arg) {
 			} else {
 				perror("PSA_ZMQ_TS: header_recv thread");
 			}
-		} else {
+		}
+		else {
 
 			pubsub_msg_header_pt hdr = (pubsub_msg_header_pt) zframe_data(headerMsg);
 

http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_common/public/src/log_helper.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/src/log_helper.c b/pubsub/pubsub_common/public/src/log_helper.c
deleted file mode 100644
index 7a63363..0000000
--- a/pubsub/pubsub_common/public/src/log_helper.c
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * log_helper.c
- *
- *  \date       Nov 10, 2014
- *  \author     <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
- *  \copyright  Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-#include <stdarg.h>
-
-#include "bundle_context.h"
-#include "service_tracker.h"
-#include "celix_threads.h"
-#include "array_list.h"
-
-#include "celix_errno.h"
-#include "log_service.h"
-
-#include "log_helper.h"
-
-#define LOGHELPER_ENABLE_STDOUT_FALLBACK_PROPERTY_NAME 	"LOGHELPER_ENABLE_STDOUT_FALLBACK"
-
-
-struct log_helper {
-	bundle_context_pt bundleContext;
-    service_tracker_pt logServiceTracker;
-	celix_thread_mutex_t logListLock;
-	array_list_pt logServices;
-	bool stdOutFallback;
-};
-
-celix_status_t logHelper_logServiceAdded(void *handle, service_reference_pt reference, void
*service);
-celix_status_t logHelper_logServiceRemoved(void *handle, service_reference_pt reference,
void *service);
-
-
-celix_status_t logHelper_create(bundle_context_pt context, log_helper_pt* loghelper)
-{
-	celix_status_t status = CELIX_SUCCESS;
-
-	(*loghelper) = calloc(1, sizeof(**loghelper));
-
-	if (!(*loghelper))
-	{
-		status = CELIX_ENOMEM;
-	}
-	else
-	{
-		const char* stdOutFallbackStr = NULL;
-		(*loghelper)->bundleContext = context;
-		(*loghelper)->logServiceTracker = NULL;
-		(*loghelper)->stdOutFallback = false;
-
-		bundleContext_getProperty(context, LOGHELPER_ENABLE_STDOUT_FALLBACK_PROPERTY_NAME, &stdOutFallbackStr);
-
-		if (stdOutFallbackStr != NULL) {
-			(*loghelper)->stdOutFallback = true;
-		}
-
-		pthread_mutex_init(&(*loghelper)->logListLock, NULL);
-        arrayList_create(&(*loghelper)->logServices);
-	}
-
-	return status;
-}
-
-celix_status_t logHelper_start(log_helper_pt loghelper)
-{
-	celix_status_t status = CELIX_SUCCESS;
-	service_tracker_customizer_pt logTrackerCustomizer = NULL;
-
-	status = serviceTrackerCustomizer_create(loghelper, NULL, logHelper_logServiceAdded, NULL,
logHelper_logServiceRemoved, &logTrackerCustomizer);
-
-	if (status == CELIX_SUCCESS) {
-		status = serviceTracker_create(loghelper->bundleContext, (char*) OSGI_LOGSERVICE_NAME,
logTrackerCustomizer, &loghelper->logServiceTracker);
-	}
-
-	if (status == CELIX_SUCCESS) {
-		status = serviceTracker_open(loghelper->logServiceTracker);
-	}
-
-	return status;
-}
-
-
-
-celix_status_t logHelper_logServiceAdded(void *handle, service_reference_pt reference, void
*service)
-{
-	log_helper_pt loghelper = handle;
-
-	pthread_mutex_lock(&loghelper->logListLock);
-	arrayList_add(loghelper->logServices, service);
-	pthread_mutex_unlock(&loghelper->logListLock);
-
-	return CELIX_SUCCESS;
-}
-
-celix_status_t logHelper_logServiceRemoved(void *handle, service_reference_pt reference,
void *service)
-{
-	log_helper_pt loghelper = handle;
-
-	pthread_mutex_lock(&loghelper->logListLock);
-	arrayList_removeElement(loghelper->logServices, service);
-	pthread_mutex_unlock(&loghelper->logListLock);
-
-	return CELIX_SUCCESS;
-}
-
-
-celix_status_t logHelper_stop(log_helper_pt loghelper) {
-	celix_status_t status = CELIX_SUCCESS;
-
-    status = serviceTracker_close(loghelper->logServiceTracker);
-
-    return status;
-}
-
-celix_status_t logHelper_destroy(log_helper_pt* loghelper) {
-        celix_status_t status = CELIX_SUCCESS;
-
-        serviceTracker_destroy((*loghelper)->logServiceTracker);
-
-        pthread_mutex_lock(&(*loghelper)->logListLock);
-        arrayList_destroy((*loghelper)->logServices);
-    	pthread_mutex_unlock(&(*loghelper)->logListLock);
-
-        pthread_mutex_destroy(&(*loghelper)->logListLock);
-
-        free(*loghelper);
-        *loghelper = NULL;
-        return status;
-}
-
-
-
-
-celix_status_t logHelper_log(log_helper_pt loghelper, log_level_t level, char* message, ...
)
-{
-    celix_status_t status = CELIX_SUCCESS;
-	va_list listPointer;
-    char msg[1024];
-    msg[0] = '\0';
-    bool logged = false;
-
-	va_start(listPointer, message);
-	vsnprintf(msg, 1024, message, listPointer);
-
-	if (loghelper != NULL) {
-		pthread_mutex_lock(&loghelper->logListLock);
-
-		int i = 0;
-
-		for (; i < arrayList_size(loghelper->logServices); i++) {
-
-			log_service_pt logService = arrayList_get(loghelper->logServices, i);
-
-			if (logService != NULL) {
-				(logService->log)(logService->logger, level, msg);
-				logged = true;
-			}
-		}
-
-		pthread_mutex_unlock(&loghelper->logListLock);
-	}
-
-
-    if (!logged && loghelper->stdOutFallback) {
-        char *levelStr = NULL;
-
-        switch (level) {
-            case OSGI_LOGSERVICE_ERROR:
-                levelStr = "ERROR";
-                break;
-            case OSGI_LOGSERVICE_WARNING:
-                levelStr = "WARNING";
-                break;
-            case OSGI_LOGSERVICE_INFO:
-                levelStr = "INFO";
-                break;
-            case OSGI_LOGSERVICE_DEBUG:
-            default:
-                levelStr = "DEBUG";
-                break;
-        }
-
-        printf("%s: %s\n", levelStr, msg);
-    }
-
-
-	return status;
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_serializer_json/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/CMakeLists.txt b/pubsub/pubsub_serializer_json/CMakeLists.txt
index a5798a4..147873a 100644
--- a/pubsub/pubsub_serializer_json/CMakeLists.txt
+++ b/pubsub/pubsub_serializer_json/CMakeLists.txt
@@ -32,7 +32,7 @@ add_bundle(org.apache.celix.pubsub_serializer.PubSubSerializerJson
     SOURCES
     	private/src/ps_activator.c
     	private/src/pubsub_serializer_impl.c
-    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
+	   ${PROJECT_SOURCE_DIR}/log_service/public/src/log_helper.c
     	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
 )
 

http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_topology_manager/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/CMakeLists.txt b/pubsub/pubsub_topology_manager/CMakeLists.txt
index cf2f4fa..b6eb796 100644
--- a/pubsub/pubsub_topology_manager/CMakeLists.txt
+++ b/pubsub/pubsub_topology_manager/CMakeLists.txt
@@ -29,8 +29,8 @@ add_bundle(org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
     SOURCES
     	private/src/pstm_activator.c
     	private/src/pubsub_topology_manager.c
+	   ${PROJECT_SOURCE_DIR}/log_service/public/src/log_helper.c
     	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
-    	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
     	${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c	
 )
 

http://git-wip-us.apache.org/repos/asf/celix/blob/6818c4f5/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
index b4e8f46..987d864 100644
--- a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
+++ b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
@@ -189,8 +189,6 @@ celix_status_t pubsub_topologyManager_psaRemoved(void * handle, service_referenc
 
 	pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service;
 
-	celixThreadMutex_lock(&manager->psaListLock);
-
 	/* Deactivate all publications */
 	celixThreadMutex_lock(&manager->publicationsLock);
 
@@ -248,8 +246,8 @@ celix_status_t pubsub_topologyManager_psaRemoved(void * handle, service_referenc
 	hashMapIterator_destroy(subit);
 	celixThreadMutex_unlock(&manager->subscriptionsLock);
 
+	celixThreadMutex_lock(&manager->psaListLock);
 	arrayList_removeElement(manager->psaList, psa);
-
 	celixThreadMutex_unlock(&manager->psaListLock);
 
 	logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Removed PSA");


Mime
View raw message