Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B6597200D0C for ; Wed, 20 Sep 2017 17:09:34 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B55B81609E1; Wed, 20 Sep 2017 15:09:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 577B41609E4 for ; Wed, 20 Sep 2017 17:09:32 +0200 (CEST) Received: (qmail 48609 invoked by uid 500); 20 Sep 2017 15:09:31 -0000 Mailing-List: contact commits-help@celix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@celix.apache.org Delivered-To: mailing list commits@celix.apache.org Received: (qmail 48536 invoked by uid 99); 20 Sep 2017 15:09:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Sep 2017 15:09:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 12DC6F576A; Wed, 20 Sep 2017 15:09:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gricciardi@apache.org To: commits@celix.apache.org Date: Wed, 20 Sep 2017 15:09:32 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [5/6] celix git commit: Refactored serializers management archived-at: Wed, 20 Sep 2017 15:09:34 -0000 Refactored serializers management Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/3b99cc34 Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/3b99cc34 Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/3b99cc34 Branch: refs/heads/pubsub_serializer_refactoring Commit: 3b99cc34cd3712b39a34e5789363ab497e85a75e Parents: 623d47e Author: gricciardi Authored: Wed Sep 20 15:38:21 2017 +0200 Committer: gricciardi Committed: Wed Sep 20 15:38:21 2017 +0200 ---------------------------------------------------------------------- pubsub/api/pubsub/publisher.h | 2 +- pubsub/api/pubsub/subscriber.h | 2 +- .../examples/mp_pubsub/publisher/CMakeLists.txt | 2 +- .../mp_pubsub/subscriber/CMakeLists.txt | 8 +- pubsub/examples/pubsub/publisher/CMakeLists.txt | 2 +- .../publisher/private/src/pubsub_publisher.c | 10 +- .../examples/pubsub/publisher2/CMakeLists.txt | 2 +- .../examples/pubsub/subscriber/CMakeLists.txt | 2 +- pubsub/pubsub_admin_udp_mc/CMakeLists.txt | 3 +- .../private/include/pubsub_admin_impl.h | 36 +- .../include/pubsub_publish_service_private.h | 59 -- .../private/include/topic_publication.h | 57 ++ .../private/include/topic_subscription.h | 9 +- .../private/src/psa_activator.c | 38 +- .../private/src/pubsub_admin_impl.c | 741 ++++++++++++------- .../private/src/topic_publication.c | 299 +++----- .../private/src/topic_subscription.c | 546 +++++++------- pubsub/pubsub_admin_zmq/CMakeLists.txt | 3 +- .../private/include/pubsub_admin_impl.h | 38 +- .../include/pubsub_publish_service_private.h | 51 -- .../private/include/topic_publication.h | 49 ++ .../private/include/topic_subscription.h | 7 +- .../private/src/psa_activator.c | 36 +- .../private/src/pubsub_admin_impl.c | 708 ++++++++++++------ .../private/src/topic_publication.c | 269 +++---- .../private/src/topic_subscription.c | 502 ++++++------- .../pubsub_common/public/include/pubsub_admin.h | 22 +- .../public/include/pubsub_admin_match.h | 27 + .../public/include/pubsub_common.h | 8 +- .../public/include/pubsub_endpoint.h | 12 +- .../public/include/pubsub_serializer.h | 27 +- pubsub/pubsub_common/public/src/log_helper.c | 48 +- .../public/src/pubsub_admin_match.c | 303 ++++++++ .../pubsub_common/public/src/pubsub_endpoint.c | 184 +++-- .../private/include/pubsub_discovery_impl.h | 1 - .../pubsub_discovery/private/src/etcd_common.c | 1 + .../pubsub_discovery/private/src/etcd_watcher.c | 216 +++--- .../pubsub_discovery/private/src/etcd_writer.c | 160 ++-- .../private/src/psd_activator.c | 2 +- .../private/src/pubsub_discovery_impl.c | 13 +- .../private/include/pubsub_serializer_impl.h | 23 +- .../private/src/ps_activator.c | 6 +- .../private/src/pubsub_serializer_impl.c | 339 +++++---- .../private/include/pubsub_topology_manager.h | 11 - .../private/src/pstm_activator.c | 64 +- .../private/src/pubsub_topology_manager.c | 526 ++++--------- 46 files changed, 2940 insertions(+), 2534 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/api/pubsub/publisher.h ---------------------------------------------------------------------- diff --git a/pubsub/api/pubsub/publisher.h b/pubsub/api/pubsub/publisher.h index 4bc6c8c..3eec149 100644 --- a/pubsub/api/pubsub/publisher.h +++ b/pubsub/api/pubsub/publisher.h @@ -30,7 +30,7 @@ #include #define PUBSUB_PUBLISHER_SERVICE_NAME "pubsub.publisher" -#define PUBSUB_PUBLISHER_SERVICE_VERSION "1.0.0" +#define PUBSUB_PUBLISHER_SERVICE_VERSION "2.0.0" //properties #define PUBSUB_PUBLISHER_TOPIC "pubsub.topic" http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/api/pubsub/subscriber.h ---------------------------------------------------------------------- diff --git a/pubsub/api/pubsub/subscriber.h b/pubsub/api/pubsub/subscriber.h index cbbe96c..5d87b8a 100644 --- a/pubsub/api/pubsub/subscriber.h +++ b/pubsub/api/pubsub/subscriber.h @@ -30,7 +30,7 @@ #include #define PUBSUB_SUBSCRIBER_SERVICE_NAME "pubsub.subscriber" -#define PUBSUB_SUBSCRIBER_SERVICE_VERSION "1.0.0" +#define PUBSUB_SUBSCRIBER_SERVICE_VERSION "2.0.0" //properties #define PUBSUB_SUBSCRIBER_TOPIC "pubsub.topic" http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt b/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt index de77156..76a01f1 100644 --- a/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt +++ b/pubsub/examples/mp_pubsub/publisher/CMakeLists.txt @@ -34,7 +34,7 @@ bundle_files(org.apache.celix.pubsub_publisher.MpPublisher ${PROJECT_SOURCE_DIR}/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor ${PROJECT_SOURCE_DIR}/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor ${PROJECT_SOURCE_DIR}/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor - DESTINATION "META-INF/descriptors/messages" + DESTINATION "META-INF/descriptors" ) bundle_files(org.apache.celix.pubsub_publisher.MpPublisher http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt b/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt index 75ec635..a480a73 100644 --- a/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt +++ b/pubsub/examples/mp_pubsub/subscriber/CMakeLists.txt @@ -31,10 +31,10 @@ add_bundle( org.apache.celix.pubsub_subscriber.MpSubscriber ) bundle_files( org.apache.celix.pubsub_subscriber.MpSubscriber - ${PROJECT_SOURCE_DIR}/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor - ${PROJECT_SOURCE_DIR}/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor - ${PROJECT_SOURCE_DIR}/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor - DESTINATION "META-INF/descriptors/messages" + ${PROJECT_SOURCE_DIR}/pubsub/examples/mp_pubsub/msg_descriptors/msg_ew.descriptor + ${PROJECT_SOURCE_DIR}/pubsub/examples/mp_pubsub/msg_descriptors/msg_ide.descriptor + ${PROJECT_SOURCE_DIR}/pubsub/examples/mp_pubsub/msg_descriptors/msg_kinematics.descriptor + DESTINATION "META-INF/descriptors" ) bundle_files(org.apache.celix.pubsub_subscriber.MpSubscriber http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/examples/pubsub/publisher/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/pubsub/examples/pubsub/publisher/CMakeLists.txt b/pubsub/examples/pubsub/publisher/CMakeLists.txt index d932611..e35c137 100644 --- a/pubsub/examples/pubsub/publisher/CMakeLists.txt +++ b/pubsub/examples/pubsub/publisher/CMakeLists.txt @@ -32,7 +32,7 @@ add_bundle(org.apache.celix.pubsub_publisher.PoiPublisher bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor - DESTINATION "META-INF/descriptors/messages" + DESTINATION "META-INF/descriptors" ) bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c ---------------------------------------------------------------------- diff --git a/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c b/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c index 66454a0..b798ea1 100644 --- a/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c +++ b/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c @@ -77,18 +77,20 @@ static void* send_thread(void* arg){ while(stop==false){ place->position.lat = randCoordinate(MIN_LAT,MAX_LAT); place->position.lon = randCoordinate(MIN_LON,MAX_LON); - int nr_char = (int)randCoordinate(5,100000); - //int nr_char = 25; + //int nr_char = (int)randCoordinate(5,100000); + int nr_char = 32; place->data = calloc(nr_char, 1); for(int i = 0; i < (nr_char-1); i++) { place->data[i] = i%10 + '0'; } if(publish_svc->send) { - publish_svc->send(publish_svc->handle,msgId,place); + if(publish_svc->send(publish_svc->handle,msgId,place)==0){ + printf("Sent %s [%f, %f] (%s, %s) data len = %d\n",st_struct->topic, place->position.lat, place->position.lon,place->name,place->description, nr_char); + } } else { printf("No send for %s\n", st_struct->topic); } - printf("Sent %s [%f, %f] (%s, %s) data len = %d\n",st_struct->topic, place->position.lat, place->position.lon,place->name,place->description, nr_char); + free(place->data); sleep(2); } http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/examples/pubsub/publisher2/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/pubsub/examples/pubsub/publisher2/CMakeLists.txt b/pubsub/examples/pubsub/publisher2/CMakeLists.txt index c44a760..b83f7dd 100644 --- a/pubsub/examples/pubsub/publisher2/CMakeLists.txt +++ b/pubsub/examples/pubsub/publisher2/CMakeLists.txt @@ -32,7 +32,7 @@ add_bundle(org.apache.celix.pubsub_publisher.PoiPublisher2 bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher2 ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor - DESTINATION "META-INF/descriptors/messages" + DESTINATION "META-INF/descriptors" ) bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher2 http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/examples/pubsub/subscriber/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/pubsub/examples/pubsub/subscriber/CMakeLists.txt b/pubsub/examples/pubsub/subscriber/CMakeLists.txt index da6a362..7fd9fae 100644 --- a/pubsub/examples/pubsub/subscriber/CMakeLists.txt +++ b/pubsub/examples/pubsub/subscriber/CMakeLists.txt @@ -33,7 +33,7 @@ add_bundle(org.apache.celix.pubsub_subscriber.PoiSubscriber bundle_files(org.apache.celix.pubsub_subscriber.PoiSubscriber ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor - DESTINATION "META-INF/descriptors/messages" + DESTINATION "META-INF/descriptors" ) bundle_files(org.apache.celix.pubsub_subscriber.PoiSubscriber http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_udp_mc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/CMakeLists.txt b/pubsub/pubsub_admin_udp_mc/CMakeLists.txt index 1ac0c2d..86f7a47 100644 --- a/pubsub/pubsub_admin_udp_mc/CMakeLists.txt +++ b/pubsub/pubsub_admin_udp_mc/CMakeLists.txt @@ -37,10 +37,11 @@ add_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc private/src/large_udp.c ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c + ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_admin_match.c ) set_target_properties(org.apache.celix.pubsub_admin.PubSubAdminUdpMc PROPERTIES INSTALL_RPATH "$ORIGIN") -target_link_libraries(org.apache.celix.pubsub_admin.PubSubAdminUdpMc celix_framework celix_utils celix_dfi ${JANSSON_LIBRARIES}) +target_link_libraries(org.apache.celix.pubsub_admin.PubSubAdminUdpMc celix_framework celix_utils celix_dfi) install_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc) http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h index 89e6547..731b037 100644 --- a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h +++ b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h @@ -24,20 +24,23 @@ * \copyright Apache License, Version 2.0 */ -#ifndef PUBSUB_ADMIN_IMPL_H_ -#define PUBSUB_ADMIN_IMPL_H_ +#ifndef PUBSUB_ADMIN_UDP_MC_IMPL_H_ +#define PUBSUB_ADMIN_UDP_MC_IMPL_H_ #include "pubsub_admin.h" -#include "pubsub_serializer.h" #include "log_helper.h" -struct pubsub_admin { +#define PUBSUB_ADMIN_TYPE "udp_mc" - pubsub_serializer_service_t* serializerSvc; +struct pubsub_admin { bundle_context_pt bundle_context; log_helper_pt loghelper; + /* List of the available serializers */ + celix_thread_mutex_t serializerListLock; // List + array_list_pt serializerList; + celix_thread_mutex_t localPublicationsLock; hash_map_pt localPublications;// @@ -50,15 +53,24 @@ struct pubsub_admin { celix_thread_mutex_t pendingSubscriptionsLock; hash_map_pt pendingSubscriptions; //> + /* Those are used to keep track of valid subscriptions/publications that still have no valid serializer */ + celix_thread_mutex_t noSerializerPendingsLock; + array_list_pt noSerializerSubscriptions; // List + array_list_pt noSerializerPublications; // List + + celix_thread_mutex_t usedSerializersLock; + hash_map_pt topicSubscriptionsPerSerializer; // > + hash_map_pt topicPublicationsPerSerializer; // > + char* ifIpAddress; // The local interface which is used for multicast communication - char* mcIpAddress; // The multicast IP address + char* mcIpAddress; // The multicast IP address int sendSocket; + void* zmq_context; // to be removed }; celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin); -celix_status_t pubsubAdmin_stop(pubsub_admin_pt admin); celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin); celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); @@ -70,10 +82,10 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char* scope, char* topic); celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope, char* topic); -celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score); -celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score); +celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt reference, void * service); +celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service); + +celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score); -celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc); -celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc); -#endif /* PUBSUB_ADMIN_IMPL_H_ */ +#endif /* PUBSUB_ADMIN_UDP_MC_IMPL_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h deleted file mode 100644 index b43fb08..0000000 --- a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h +++ /dev/null @@ -1,59 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * pubsub_publish_service_private.h - * - * \date Sep 24, 2015 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#ifndef PUBSUB_PUBLISH_SERVICE_PRIVATE_H_ -#define PUBSUB_PUBLISH_SERVICE_PRIVATE_H_ - -#include "publisher.h" -#include "pubsub_endpoint.h" -#include "pubsub_common.h" -#include "pubsub_serializer.h" - -#define UDP_BASE_PORT 49152 -#define UDP_MAX_PORT 65000 - -typedef struct pubsub_udp_msg { - struct pubsub_msg_header header; - unsigned int payloadSize; - char payload[]; -} pubsub_udp_msg_t; - -typedef struct topic_publication *topic_publication_pt; -celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, char* bindIP, topic_publication_pt *out); -celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub); - -celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep); -celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep); - -celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc); -celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc); - -celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory); -celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub); - -array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub); - -#endif /* PUBSUB_PUBLISH_SERVICE_PRIVATE_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_udp_mc/private/include/topic_publication.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/private/include/topic_publication.h b/pubsub/pubsub_admin_udp_mc/private/include/topic_publication.h new file mode 100644 index 0000000..4363d71 --- /dev/null +++ b/pubsub/pubsub_admin_udp_mc/private/include/topic_publication.h @@ -0,0 +1,57 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * topic_publication.h + * + * \date Sep 24, 2015 + * \author Apache Celix Project Team + * \copyright Apache License, Version 2.0 + */ + +#ifndef TOPIC_PUBLICATION_H_ +#define TOPIC_PUBLICATION_H_ + +#include "publisher.h" +#include "pubsub_endpoint.h" +#include "pubsub_common.h" + +#include "pubsub_serializer.h" + +#define UDP_BASE_PORT 49152 +#define UDP_MAX_PORT 65000 + +typedef struct pubsub_udp_msg { + struct pubsub_msg_header header; + unsigned int payloadSize; + char payload[]; +} pubsub_udp_msg_t; + +typedef struct topic_publication *topic_publication_pt; +celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, topic_publication_pt *out); +celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub); + +celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep); +celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep); + +celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory); +celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub); + +array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub); + +#endif /* TOPIC_PUBLICATION_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h b/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h index a65cb6b..475416a 100644 --- a/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h +++ b/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h @@ -38,20 +38,21 @@ typedef struct topic_subscription* topic_subscription_pt; -celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, pubsub_serializer_service_t* serializer, char* scope, char* topic,topic_subscription_pt* out); +celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, char* ifIp,char* scope, char* topic ,pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out); celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts); celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts); celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts); +celix_status_t pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL); +celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL); + celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL); celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL); celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP); celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP); -celix_status_t pubsub_topicSubscriptionSetSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc); -celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc); - +array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt sub); celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription); celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt subscription); unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt subscription); http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c b/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c index cb298fe..cd4ee07 100644 --- a/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c +++ b/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c @@ -28,6 +28,7 @@ #include "bundle_activator.h" #include "service_registration.h" +#include "service_tracker.h" #include "pubsub_admin_impl.h" @@ -35,6 +36,7 @@ struct activator { pubsub_admin_pt admin; pubsub_admin_service_pt adminService; service_registration_pt registration; + service_tracker_pt serializerTracker; }; celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) { @@ -47,7 +49,28 @@ celix_status_t bundleActivator_create(bundle_context_pt context, void **userData } else{ *userData = activator; + status = pubsubAdmin_create(context, &(activator->admin)); + + if(status == CELIX_SUCCESS){ + service_tracker_customizer_pt customizer = NULL; + status = serviceTrackerCustomizer_create(activator->admin, + NULL, + pubsubAdmin_serializerAdded, + NULL, + pubsubAdmin_serializerRemoved, + &customizer); + if(status == CELIX_SUCCESS){ + status = serviceTracker_create(context, PUBSUB_SERIALIZER_SERVICE, customizer, &(activator->serializerTracker)); + if(status != CELIX_SUCCESS){ + serviceTrackerCustomizer_destroy(customizer); + pubsubAdmin_destroy(activator->admin); + } + } + else{ + pubsubAdmin_destroy(activator->admin); + } + } } return status; @@ -73,16 +96,14 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) pubsubAdminSvc->closeAllPublications = pubsubAdmin_closeAllPublications; pubsubAdminSvc->closeAllSubscriptions = pubsubAdmin_closeAllSubscriptions; - pubsubAdminSvc->matchPublisher = pubsubAdmin_matchPublisher; - pubsubAdminSvc->matchSubscriber = pubsubAdmin_matchSubscriber; - - pubsubAdminSvc->setSerializer = pubsubAdmin_setSerializer; - pubsubAdminSvc->removeSerializer = pubsubAdmin_removeSerializer; + pubsubAdminSvc->matchEndpoint = pubsubAdmin_matchEndpoint; activator->adminService = pubsubAdminSvc; status = bundleContext_registerService(context, PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration); + status += serviceTracker_open(activator->serializerTracker); + } @@ -93,10 +114,10 @@ celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) celix_status_t status = CELIX_SUCCESS; struct activator *activator = userData; - serviceRegistration_unregister(activator->registration); - activator->registration = NULL; + status += serviceTracker_close(activator->serializerTracker); + status += serviceRegistration_unregister(activator->registration); - pubsubAdmin_stop(activator->admin); + activator->registration = NULL; free(activator->adminService); activator->adminService = NULL; @@ -108,6 +129,7 @@ celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt contex celix_status_t status = CELIX_SUCCESS; struct activator *activator = userData; + serviceTracker_destroy(activator->serializerTracker); pubsubAdmin_destroy(activator->admin); activator->admin = NULL; http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c index bbb452d..6f9427b 100644 --- a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c +++ b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c @@ -57,9 +57,10 @@ #include "pubsub_admin_impl.h" #include "topic_subscription.h" -#include "pubsub_publish_service_private.h" +#include "topic_publication.h" #include "pubsub_endpoint.h" #include "subscriber.h" +#include "pubsub_admin_match.h" static const char *DEFAULT_MC_IP = "224.100.1.1"; static char *DEFAULT_MC_PREFIX = "224.100"; @@ -67,7 +68,10 @@ static char *DEFAULT_MC_PREFIX = "224.100"; static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip); static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); -static celix_status_t pubsubAdmin_match(pubsub_admin_pt admin, pubsub_endpoint_pt psEP, double* score); + +static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc); +static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication); +static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication); celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin) { celix_status_t status = CELIX_SUCCESS; @@ -86,11 +90,19 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad (*admin)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); (*admin)->pendingSubscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); (*admin)->externalPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + (*admin)->topicSubscriptionsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL); + (*admin)->topicPublicationsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL); + arrayList_create(&((*admin)->noSerializerSubscriptions)); + arrayList_create(&((*admin)->noSerializerPublications)); + arrayList_create(&((*admin)->serializerList)); celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL); celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL); celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, NULL); celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL); + celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, NULL); + celixThreadMutex_create(&(*admin)->serializerListLock, NULL); + celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL); if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) { logHelper_start((*admin)->loghelper); @@ -104,7 +116,7 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad if (mc_ip == NULL) { const char *mc_prefix = NULL; const char *interface = NULL; - int b0 = 224, b1 = 100, b2 = 1, b3 = 1; + int b0, b1, b2, b3; bundleContext_getProperty(context,PSA_MULTICAST_IP_PREFIX , &mc_prefix); if(mc_prefix == NULL) { mc_prefix = DEFAULT_MC_PREFIX; @@ -112,12 +124,12 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad bundleContext_getProperty(context, PSA_ITF, &interface); if (pubsubAdmin_getIpAddress(interface, &if_ip) != CELIX_SUCCESS) { - logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA: Could not retrieve IP address for interface %s", interface); + logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: Could not retrieve IP address for interface %s", interface); } printf("IP Detected : %s\n", if_ip); if(if_ip && sscanf(if_ip, "%i.%i.%i.%i", &b0, &b1, &b2, &b3) != 4) { - logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA: Could not parse IP address %s", if_ip); + logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: Could not parse IP address %s", if_ip); b2 = 1; b3 = 1; } @@ -127,57 +139,41 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad int sendSocket = socket(AF_INET, SOCK_DGRAM, 0); if(sendSocket == -1) { perror("pubsubAdmin_create:socket"); - status = CELIX_SERVICE_EXCEPTION; + return CELIX_SERVICE_EXCEPTION; } - else{ - char loop = 1; - if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)) != 0) { - perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_LOOP)"); - status = CELIX_SERVICE_EXCEPTION; - } - - if (status == CELIX_SUCCESS){ - struct in_addr multicast_interface; - inet_aton(if_ip, &multicast_interface); - if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_IF, &multicast_interface, sizeof(multicast_interface)) != 0) { - perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_IF)"); - status = CELIX_SERVICE_EXCEPTION; - } - else{ - (*admin)->sendSocket = sendSocket; - } - } - - if(status!=CELIX_SUCCESS){ - close(sendSocket); - } - + char loop = 1; + if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)) != 0) { + perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_LOOP)"); + return CELIX_SERVICE_EXCEPTION; } + struct in_addr multicast_interface; + inet_aton(if_ip, &multicast_interface); + if(setsockopt(sendSocket, IPPROTO_IP, IP_MULTICAST_IF, &multicast_interface, sizeof(multicast_interface)) != 0) { + perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_IF)"); + return CELIX_SERVICE_EXCEPTION; + } + (*admin)->sendSocket = sendSocket; } #endif if (if_ip != NULL) { - logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA: Using %s as interface for multicast communication", if_ip); + logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_UDP_MC: Using %s as interface for multicast communication", if_ip); (*admin)->ifIpAddress = if_ip; } else { (*admin)->ifIpAddress = strdup("127.0.0.1"); } if (mc_ip != NULL) { - logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA: Using %s for service annunciation", mc_ip); + logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_UDP_MC: Using %s for service annunciation", mc_ip); (*admin)->mcIpAddress = mc_ip; } else { - logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA: No IP address for service annunciation set. Using %s", DEFAULT_MC_IP); + logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: No IP address for service annunciation set. Using %s", DEFAULT_MC_IP); (*admin)->mcIpAddress = strdup(DEFAULT_MC_IP); } - if (status != CELIX_SUCCESS){ - pubsubAdmin_destroy(*admin); - } - } return status; @@ -221,6 +217,36 @@ celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin) hashMap_destroy(admin->externalPublications,false,false); celixThreadMutex_unlock(&admin->externalPublicationsLock); + celixThreadMutex_lock(&admin->serializerListLock); + arrayList_destroy(admin->serializerList); + celixThreadMutex_unlock(&admin->serializerListLock); + + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_destroy(admin->noSerializerSubscriptions); + arrayList_destroy(admin->noSerializerPublications); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + + celixThreadMutex_lock(&admin->usedSerializersLock); + + iter = hashMapIterator_create(admin->topicSubscriptionsPerSerializer); + while(hashMapIterator_hasNext(iter)){ + arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter)); + } + hashMapIterator_destroy(iter); + hashMap_destroy(admin->topicSubscriptionsPerSerializer,false,false); + + iter = hashMapIterator_create(admin->topicPublicationsPerSerializer); + while(hashMapIterator_hasNext(iter)){ + arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter)); + } + hashMapIterator_destroy(iter); + hashMap_destroy(admin->topicPublicationsPerSerializer,false,false); + + celixThreadMutex_unlock(&admin->usedSerializersLock); + + celixThreadMutex_destroy(&admin->usedSerializersLock); + celixThreadMutex_destroy(&admin->noSerializerPendingsLock); + celixThreadMutex_destroy(&admin->serializerListLock); celixThreadMutex_destroy(&admin->pendingSubscriptionsLock); celixThreadMutex_destroy(&admin->subscriptionsLock); celixThreadMutex_destroy(&admin->localPublicationsLock); @@ -235,12 +261,6 @@ celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin) return status; } -celix_status_t pubsubAdmin_stop(pubsub_admin_pt admin) { - celix_status_t status = CELIX_SUCCESS; - - return status; -} - static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ celix_status_t status = CELIX_SUCCESS; @@ -251,52 +271,68 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu if(any_sub==NULL){ int i; + pubsub_serializer_service_t *best_serializer = NULL; + if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){ + status = pubsub_topicSubscriptionCreate(admin->bundle_context, admin->ifIpAddress, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, &any_sub); + } + else{ + printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",subEP->topic); + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_add(admin->noSerializerSubscriptions,subEP); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + } - status += pubsub_topicSubscriptionCreate(admin->ifIpAddress, admin->bundle_context, admin->serializerSvc, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC,&any_sub); + if (status == CELIX_SUCCESS){ - /* Connect all internal publishers */ - celixThreadMutex_lock(&admin->localPublicationsLock); - hash_map_iterator_pt lp_iter =hashMapIterator_create(admin->localPublications); - while(hashMapIterator_hasNext(lp_iter)){ - service_factory_pt factory = (service_factory_pt)hashMapIterator_nextValue(lp_iter); - topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle; - array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs); - - if(topic_publishers!=NULL){ - for(i=0;iendpoint !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint); + /* Connect all internal publishers */ + celixThreadMutex_lock(&admin->localPublicationsLock); + hash_map_iterator_pt lp_iter =hashMapIterator_create(admin->localPublications); + while(hashMapIterator_hasNext(lp_iter)){ + service_factory_pt factory = (service_factory_pt)hashMapIterator_nextValue(lp_iter); + topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle; + array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs); + + if(topic_publishers!=NULL){ + for(i=0;iendpoint !=NULL){ + status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint); + } } + arrayList_destroy(topic_publishers); } } - } - hashMapIterator_destroy(lp_iter); - celixThreadMutex_unlock(&admin->localPublicationsLock); - - /* Connect also all external publishers */ - celixThreadMutex_lock(&admin->externalPublicationsLock); - hash_map_iterator_pt extp_iter =hashMapIterator_create(admin->externalPublications); - while(hashMapIterator_hasNext(extp_iter)){ - array_list_pt ext_pub_list = (array_list_pt)hashMapIterator_nextValue(extp_iter); - if(ext_pub_list!=NULL){ - for(i=0;iendpoint !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint); + hashMapIterator_destroy(lp_iter); + celixThreadMutex_unlock(&admin->localPublicationsLock); + + /* Connect also all external publishers */ + celixThreadMutex_lock(&admin->externalPublicationsLock); + hash_map_iterator_pt extp_iter =hashMapIterator_create(admin->externalPublications); + while(hashMapIterator_hasNext(extp_iter)){ + array_list_pt ext_pub_list = (array_list_pt)hashMapIterator_nextValue(extp_iter); + if(ext_pub_list!=NULL){ + for(i=0;iendpoint !=NULL){ + status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint); + } } } } - } - hashMapIterator_destroy(extp_iter); - celixThreadMutex_unlock(&admin->externalPublicationsLock); + hashMapIterator_destroy(extp_iter); + celixThreadMutex_unlock(&admin->externalPublicationsLock); - pubsub_topicSubscriptionAddSubscriber(any_sub,subEP); + pubsub_topicSubscriptionAddSubscriber(any_sub,subEP); - status += pubsub_topicSubscriptionStart(any_sub); + status += pubsub_topicSubscriptionStart(any_sub); + + } - hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub); + if (status == CELIX_SUCCESS){ + hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub); + connectTopicPubSubToSerializer(admin, best_serializer, any_sub, false); + } } @@ -308,16 +344,16 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ celix_status_t status = CELIX_SUCCESS; - printf("PSA: Received subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope,subEP->topic); + printf("PSA_UDP_MC: Received subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope,subEP->topic); if(strcmp(subEP->topic,PUBSUB_ANY_SUB_TOPIC)==0){ return pubsubAdmin_addAnySubscription(admin,subEP); } - celixThreadMutex_lock(&admin->subscriptionsLock); /* Check if we already know some publisher about this topic, otherwise let's put the subscription in the pending hashmap */ celixThreadMutex_lock(&admin->localPublicationsLock); celixThreadMutex_lock(&admin->externalPublicationsLock); + char* scope_topic = createScopeTopicKey(subEP->scope,subEP->topic); service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic); @@ -330,54 +366,71 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint } else{ int i; - topic_subscription_pt subscription = hashMap_get(admin->subscriptions, scope_topic); if(subscription == NULL) { + pubsub_serializer_service_t *best_serializer = NULL; + if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){ + status += pubsub_topicSubscriptionCreate(admin->bundle_context,admin->ifIpAddress, subEP->scope, subEP->topic, best_serializer, &subscription); + } + else{ + printf("PSA_UDP_MC: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",subEP->topic); + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_add(admin->noSerializerSubscriptions,subEP); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + } - status += pubsub_topicSubscriptionCreate(admin->ifIpAddress, admin->bundle_context, admin->serializerSvc, subEP->scope, subEP->topic,&subscription); + if (status==CELIX_SUCCESS){ - /* Try to connect internal publishers */ - if(factory!=NULL){ - topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle; - array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs); + /* Try to connect internal publishers */ + if(factory!=NULL){ + topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle; + array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs); - if(topic_publishers!=NULL){ - for(i=0;iendpoint !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint); + if(topic_publishers!=NULL){ + for(i=0;iendpoint !=NULL){ + status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint); + } } + arrayList_destroy(topic_publishers); } - } - } + } - /* Look also for external publishers */ - if(ext_pub_list!=NULL){ - for(i=0;iendpoint !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint); + /* Look also for external publishers */ + if(ext_pub_list!=NULL){ + for(i=0;iendpoint !=NULL){ + status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint); + } } } - } - pubsub_topicSubscriptionAddSubscriber(subscription,subEP); + pubsub_topicSubscriptionAddSubscriber(subscription,subEP); + + status += pubsub_topicSubscriptionStart(subscription); - status += pubsub_topicSubscriptionStart(subscription); + } if(status==CELIX_SUCCESS){ + celixThreadMutex_lock(&admin->subscriptionsLock); hashMap_put(admin->subscriptions,strdup(scope_topic),subscription); + celixThreadMutex_unlock(&admin->subscriptionsLock); + connectTopicPubSubToSerializer(admin, best_serializer, subscription, false); } } - pubsub_topicIncreaseNrSubscribers(subscription); + if (status == CELIX_SUCCESS){ + pubsub_topicIncreaseNrSubscribers(subscription); + } } + free(scope_topic); celixThreadMutex_unlock(&admin->externalPublicationsLock); celixThreadMutex_unlock(&admin->localPublicationsLock); - celixThreadMutex_unlock(&admin->subscriptionsLock); return status; @@ -386,12 +439,13 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ celix_status_t status = CELIX_SUCCESS; - printf("PSA: Removing subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope, subEP->topic); + printf("PSA_UDP_MC: Removing subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope, subEP->topic); celixThreadMutex_lock(&admin->subscriptionsLock); + char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic); topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic); - free(scope_topic); + if(sub!=NULL){ pubsub_topicDecreaseNrSubscribers(sub); if(pubsub_topicGetNrSubscribers(sub) == 0) { @@ -399,9 +453,16 @@ celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpo } } else{ - status = CELIX_ILLEGAL_STATE; + /* Maybe the endpoint was pending */ + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + if(!arrayList_removeElement(admin->noSerializerSubscriptions, subEP)){ + status = CELIX_ILLEGAL_STATE; + } + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); } + free(scope_topic); + celixThreadMutex_unlock(&admin->subscriptionsLock); return status; @@ -411,112 +472,120 @@ celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpo celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){ celix_status_t status = CELIX_SUCCESS; - printf("PSA: Received publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->scope, pubEP->topic); + printf("PSA_UDP_MC: Received publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->scope, pubEP->topic); const char* fwUUID = NULL; bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); if(fwUUID==NULL){ - printf("PSA: Cannot retrieve fwUUID.\n"); + printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n"); return CELIX_INVALID_BUNDLE_CONTEXT; } char* scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic); - if((strcmp(pubEP->frameworkUUID,fwUUID)==0) && (pubEP->endpoint==NULL)){ + if ((strcmp(pubEP->frameworkUUID, fwUUID) == 0) && (pubEP->endpoint == NULL)) { celixThreadMutex_lock(&admin->localPublicationsLock); - - service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic); + service_factory_pt factory = (service_factory_pt) hashMap_get(admin->localPublications, scope_topic); if (factory == NULL) { topic_publication_pt pub = NULL; - status = pubsub_topicPublicationCreate(admin->sendSocket, pubEP, admin->mcIpAddress,&pub); - pubsub_topicPublicationSetSerializer(pub, admin->serializerSvc); //TODO move back to contructor - //TODO this is certainly needed when admin are created per available serializer - if(status == CELIX_SUCCESS){ - status = pubsub_topicPublicationStart(admin->bundle_context,pub,&factory); - if(status==CELIX_SUCCESS && factory !=NULL){ - hashMap_put(admin->localPublications,strdup(scope_topic),factory); - } + pubsub_serializer_service_t *best_serializer = NULL; + if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer)) == CELIX_SUCCESS){ + status = pubsub_topicPublicationCreate(admin->sendSocket, pubEP, best_serializer, admin->mcIpAddress, &pub); } else{ - printf("PSA: Cannot create a topicPublication for topic %s (bundle %ld).\n",pubEP->topic,pubEP->serviceID); + printf("PSA_UDP_MC: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n", pubEP->topic); + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_add(admin->noSerializerPublications,pubEP); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); } - } - else{ + + if (status == CELIX_SUCCESS) { + status = pubsub_topicPublicationStart(admin->bundle_context, pub, &factory); + if (status == CELIX_SUCCESS && factory != NULL) { + hashMap_put(admin->localPublications, strdup(scope_topic), factory); + connectTopicPubSubToSerializer(admin, best_serializer, pub, true); + } + } else { + printf("PSA_UDP_MC: Cannot create a topicPublication for scope=%s, topic=%s (bundle %ld).\n", pubEP->scope, pubEP->topic, pubEP->serviceID); + } + } else { //just add the new EP to the list - topic_publication_pt pub = (topic_publication_pt)factory->handle; - pubsub_topicPublicationAddPublisherEP(pub,pubEP); + topic_publication_pt pub = (topic_publication_pt) factory->handle; + pubsub_topicPublicationAddPublisherEP(pub, pubEP); } - celixThreadMutex_unlock(&admin->localPublicationsLock); } else{ + celixThreadMutex_lock(&admin->externalPublicationsLock); - array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic); - if(ext_pub_list==NULL){ + array_list_pt ext_pub_list = (array_list_pt) hashMap_get(admin->externalPublications, scope_topic); + if (ext_pub_list == NULL) { arrayList_create(&ext_pub_list); - hashMap_put(admin->externalPublications,strdup(scope_topic),ext_pub_list); + hashMap_put(admin->externalPublications, strdup(scope_topic), ext_pub_list); } - arrayList_add(ext_pub_list,pubEP); + arrayList_add(ext_pub_list, pubEP); celixThreadMutex_unlock(&admin->externalPublicationsLock); } - /* Connect the new publisher to the subscription for his topic, if there is any */ - celixThreadMutex_lock(&admin->subscriptionsLock); - - topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic); - if(sub!=NULL && pubEP->endpoint!=NULL){ - pubsub_topicSubscriptionConnectPublisher(sub,pubEP->endpoint); - } - - /* And check also for ANY subscription */ - topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC); - if(any_sub!=NULL && pubEP->endpoint!=NULL){ - pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint); - } - - celixThreadMutex_unlock(&admin->subscriptionsLock); - /* Re-evaluate the pending subscriptions */ celixThreadMutex_lock(&admin->pendingSubscriptionsLock); - hash_map_entry_pt pendingSub = hashMap_getEntry(admin->pendingSubscriptions,scope_topic); - if(pendingSub!=NULL){ //There were pending subscription for the just published topic. Let's connect them. - char* key = (char*)hashMapEntry_getKey(pendingSub); - array_list_pt pendingSubList = (array_list_pt)hashMapEntry_getValue(pendingSub); + hash_map_entry_pt pendingSub = hashMap_getEntry(admin->pendingSubscriptions, scope_topic); + if (pendingSub != NULL) { //There were pending subscription for the just published topic. Let's connect them. + char* topic = (char*) hashMapEntry_getKey(pendingSub); + array_list_pt pendingSubList = (array_list_pt) hashMapEntry_getValue(pendingSub); int i; - for(i=0;ipendingSubscriptions,key); + hashMap_remove(admin->pendingSubscriptions, scope_topic); arrayList_clear(pendingSubList); arrayList_destroy(pendingSubList); - free(key); + free(topic); } - free(scope_topic); celixThreadMutex_unlock(&admin->pendingSubscriptionsLock); + /* Connect the new publisher to the subscription for his topic, if there is any */ + celixThreadMutex_lock(&admin->subscriptionsLock); + + topic_subscription_pt sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, scope_topic); + if (sub != NULL && pubEP->endpoint != NULL) { + pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, pubEP->endpoint); + } + + /* And check also for ANY subscription */ + topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC); + if (any_sub != NULL && pubEP->endpoint != NULL) { + pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, pubEP->endpoint); + } + + free(scope_topic); + + celixThreadMutex_unlock(&admin->subscriptionsLock); + return status; } celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){ celix_status_t status = CELIX_SUCCESS; + int count = 0; - printf("PSA: Removing publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->scope, pubEP->topic); + printf("PSA_UDP_MC: Removing publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->scope, pubEP->topic); const char* fwUUID = NULL; bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); if(fwUUID==NULL){ - printf("PSA: Cannot retrieve fwUUID.\n"); + printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n"); return CELIX_INVALID_BUNDLE_CONTEXT; } char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic); @@ -531,7 +600,12 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi pubsub_topicPublicationRemovePublisherEP(pub,pubEP); } else{ - status = CELIX_ILLEGAL_STATE; + /* Maybe the endpoint was pending */ + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + if(!arrayList_removeElement(admin->noSerializerPublications, pubEP)){ + status = CELIX_ILLEGAL_STATE; + } + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); } celixThreadMutex_unlock(&admin->localPublicationsLock); @@ -546,15 +620,23 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi for(i=0;!found && iendpoint,p->endpoint) == 0) { + count++; + } + } + if(arrayList_size(ext_pub_list)==0){ hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic); char* topic = (char*)hashMapEntry_getKey(entry); array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry); - hashMap_remove(admin->externalPublications,scope_topic); + hashMap_remove(admin->externalPublications,topic); arrayList_destroy(list); free(topic); } @@ -567,15 +649,16 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi celixThreadMutex_lock(&admin->subscriptionsLock); topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic); - if(sub!=NULL && pubEP->endpoint!=NULL){ - pubsub_topicSubscriptionDisconnectPublisher(sub,pubEP->endpoint); + if(sub!=NULL && pubEP->endpoint!=NULL && count == 0){ + pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint); } /* And check also for ANY subscription */ topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC); - if(any_sub!=NULL && pubEP->endpoint!=NULL){ - pubsub_topicSubscriptionDisconnectPublisher(any_sub,pubEP->endpoint); + if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){ + pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint); } + free(scope_topic); celixThreadMutex_unlock(&admin->subscriptionsLock); @@ -586,7 +669,7 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char *scope, char* topic){ celix_status_t status = CELIX_SUCCESS; - printf("PSA: Closing all publications for scope=%s,topic=%s\n", scope, topic); + printf("PSA_UDP_MC: Closing all publications for scope=%s,topic=%s\n", scope, topic); celixThreadMutex_lock(&admin->localPublicationsLock); char* scope_topic =createScopeTopicKey(scope, topic); @@ -598,6 +681,7 @@ celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char *scop status += pubsub_topicPublicationStop(pub); status += pubsub_topicPublicationDestroy(pub); + disconnectTopicPubSubFromSerializer(admin, pub, true); hashMap_remove(admin->localPublications,scope_topic); free(key); free(factory); @@ -612,7 +696,7 @@ celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char *scop celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char *scope, char* topic){ celix_status_t status = CELIX_SUCCESS; - printf("PSA: Closing all subscriptions\n"); + printf("PSA_UDP_MC: Closing all subscriptions\n"); celixThreadMutex_lock(&admin->subscriptionsLock); char* scope_topic =createScopeTopicKey(scope, topic); @@ -624,6 +708,7 @@ celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char *sco status += pubsub_topicSubscriptionStop(ts); status += pubsub_topicSubscriptionDestroy(ts); + disconnectTopicPubSubFromSerializer(admin, ts, false); hashMap_remove(admin->subscriptions,topic); free(topic); @@ -635,92 +720,6 @@ celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char *sco } -celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score){ - celix_status_t status = CELIX_SUCCESS; - status = pubsubAdmin_match(admin, pubEP, score); - return status; -} - -celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score){ - celix_status_t status = CELIX_SUCCESS; - status = pubsubAdmin_match(admin, subEP, score); - return status; -} - -celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc){ - celix_status_t status = CELIX_SUCCESS; - admin->serializerSvc = serializerSvc; - - /* Add serializer to all topic_publication_pt */ - celixThreadMutex_lock(&admin->localPublicationsLock); - hash_map_iterator_pt lp_iter = hashMapIterator_create(admin->localPublications); - while(hashMapIterator_hasNext(lp_iter)){ - service_factory_pt factory = (service_factory_pt) hashMapIterator_nextValue(lp_iter); - topic_publication_pt topic_pub = (topic_publication_pt) factory->handle; - pubsub_topicPublicationSetSerializer(topic_pub, admin->serializerSvc); - } - hashMapIterator_destroy(lp_iter); - celixThreadMutex_unlock(&admin->localPublicationsLock); - - /* Add serializer to all topic_subscription_pt */ - celixThreadMutex_lock(&admin->subscriptionsLock); - hash_map_iterator_pt subs_iter = hashMapIterator_create(admin->subscriptions); - while(hashMapIterator_hasNext(subs_iter)){ - topic_subscription_pt topic_sub = (topic_subscription_pt) hashMapIterator_nextValue(subs_iter); - pubsub_topicSubscriptionSetSerializer(topic_sub, admin->serializerSvc); - } - hashMapIterator_destroy(subs_iter); - celixThreadMutex_unlock(&admin->subscriptionsLock); - - return status; -} - -celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc){ - celix_status_t status = CELIX_SUCCESS; - admin->serializerSvc = NULL; - - /* Remove serializer from all topic_publication_pt */ - celixThreadMutex_lock(&admin->localPublicationsLock); - hash_map_iterator_pt lp_iter = hashMapIterator_create(admin->localPublications); - while(hashMapIterator_hasNext(lp_iter)){ - service_factory_pt factory = (service_factory_pt) hashMapIterator_nextValue(lp_iter); - topic_publication_pt topic_pub = (topic_publication_pt) factory->handle; - pubsub_topicPublicationRemoveSerializer(topic_pub, admin->serializerSvc); - } - hashMapIterator_destroy(lp_iter); - celixThreadMutex_unlock(&admin->localPublicationsLock); - - /* Remove serializer from all topic_subscription_pt */ - celixThreadMutex_lock(&admin->subscriptionsLock); - hash_map_iterator_pt subs_iter = hashMapIterator_create(admin->subscriptions); - while(hashMapIterator_hasNext(subs_iter)){ - topic_subscription_pt topic_sub = (topic_subscription_pt) hashMapIterator_nextValue(subs_iter); - pubsub_topicSubscriptionRemoveSerializer(topic_sub, admin->serializerSvc); - } - hashMapIterator_destroy(subs_iter); - celixThreadMutex_unlock(&admin->subscriptionsLock); - - return status; -} - -static celix_status_t pubsubAdmin_match(pubsub_admin_pt admin, pubsub_endpoint_pt psEP, double* score){ - celix_status_t status = CELIX_SUCCESS; - - char topic_psa_prop[1024]; - snprintf(topic_psa_prop, 1024, "%s.psa", psEP->topic); - - const char* psa_to_use = NULL; - bundleContext_getPropertyWithDefault(admin->bundle_context, topic_psa_prop, PSA_DEFAULT, &psa_to_use); - - *score = 0; - if (strcmp(psa_to_use, "udp") == 0){ - *score += 100; - }else{ - *score += 1; - } - - return status; -} #ifndef ANDROID static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip) { @@ -764,11 +763,245 @@ static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt a array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic); if(pendingListPerTopic==NULL){ arrayList_create(&pendingListPerTopic); - hashMap_put(admin->pendingSubscriptions,scope_topic,pendingListPerTopic); - } else { - free(scope_topic); + hashMap_put(admin->pendingSubscriptions,strdup(scope_topic),pendingListPerTopic); } arrayList_add(pendingListPerTopic,subEP); + free(scope_topic); + + return status; +} + + +celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt reference, void * service){ + /* Assumption: serializers are all available at startup. + * If a new (possibly better) serializer is installed and started, already created topic_publications/subscriptions will not be destroyed and recreated */ + + celix_status_t status = CELIX_SUCCESS; + int i=0; + + const char *serType = NULL; + serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType); + if(serType == NULL){ + printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference); + return CELIX_SERVICE_EXCEPTION; + } + + pubsub_admin_pt admin = (pubsub_admin_pt)handle; + celixThreadMutex_lock(&admin->serializerListLock); + arrayList_add(admin->serializerList, reference); + celixThreadMutex_unlock(&admin->serializerListLock); + + /* Now let's re-evaluate the pending */ + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + + for(i=0;inoSerializerSubscriptions);i++){ + pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i); + pubsub_serializer_service_t *best_serializer = NULL; + pubsubAdmin_getBestSerializer(admin, ep, &best_serializer); + if(best_serializer != NULL){ /* Finally we have a valid serializer! */ + pubsubAdmin_addSubscription(admin, ep); + } + } + + for(i=0;inoSerializerPublications);i++){ + pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i); + pubsub_serializer_service_t *best_serializer = NULL; + pubsubAdmin_getBestSerializer(admin, ep, &best_serializer); + if(best_serializer != NULL){ /* Finally we have a valid serializer! */ + pubsubAdmin_addPublication(admin, ep); + } + } + + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + + printf("PSA_UDP_MC: %s serializer added\n",serType); return status; } + +celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service){ + + pubsub_admin_pt admin = (pubsub_admin_pt)handle; + int i=0, j=0; + const char *serType = NULL; + + serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType); + if(serType == NULL){ + printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference); + return CELIX_SERVICE_EXCEPTION; + } + + celixThreadMutex_lock(&admin->serializerListLock); + celixThreadMutex_lock(&admin->usedSerializersLock); + + + /* Remove the serializer from the list */ + arrayList_removeElement(admin->serializerList, reference); + + /* Now destroy the topicPublications, but first put back the pubsub_endpoints back to the noSerializer pending list */ + array_list_pt topicPubList = (array_list_pt)hashMap_remove(admin->topicPublicationsPerSerializer, service); + if(topicPubList!=NULL){ + for(i=0;iendpoint!=NULL){ + free(pubEP->endpoint); + pubEP->endpoint = NULL; + } + /* Add the orphan endpoint to the noSerializer pending list */ + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_add(admin->noSerializerPublications,pubEP); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + } + arrayList_destroy(pubList); + + /* Cleanup also the localPublications hashmap*/ + celixThreadMutex_lock(&admin->localPublicationsLock); + hash_map_iterator_pt iter = hashMapIterator_create(admin->localPublications); + char *key = NULL; + service_factory_pt factory = NULL; + while(hashMapIterator_hasNext(iter)){ + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + factory = (service_factory_pt)hashMapEntry_getValue(entry); + topic_publication_pt pub = (topic_publication_pt)factory->handle; + if(pub==topicPub){ + key = (char*)hashMapEntry_getKey(entry); + break; + } + } + hashMapIterator_destroy(iter); + if(key!=NULL){ + hashMap_remove(admin->localPublications, key); + free(factory); + free(key); + } + celixThreadMutex_unlock(&admin->localPublicationsLock); + + /* Finally destroy the topicPublication */ + pubsub_topicPublicationDestroy(topicPub); + } + arrayList_destroy(topicPubList); + } + + /* Now destroy the topicSubscriptions, but first put back the pubsub_endpoints back to the noSerializer pending list */ + array_list_pt topicSubList = (array_list_pt)hashMap_remove(admin->topicSubscriptionsPerSerializer, service); + if(topicSubList!=NULL){ + for(i=0;iendpoint!=NULL){ + free(subEP->endpoint); + subEP->endpoint = NULL; + } + /* Add the orphan endpoint to the noSerializer pending list */ + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_add(admin->noSerializerSubscriptions,subEP); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + } + + /* Cleanup also the subscriptions hashmap*/ + celixThreadMutex_lock(&admin->subscriptionsLock); + hash_map_iterator_pt iter = hashMapIterator_create(admin->subscriptions); + char *key = NULL; + while(hashMapIterator_hasNext(iter)){ + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + topic_subscription_pt sub = (topic_subscription_pt)hashMapEntry_getValue(entry); + if(sub==topicSub){ + key = (char*)hashMapEntry_getKey(entry); + break; + } + } + hashMapIterator_destroy(iter); + if(key!=NULL){ + hashMap_remove(admin->subscriptions, key); + free(key); + } + celixThreadMutex_unlock(&admin->subscriptionsLock); + + /* Finally destroy the topicSubscription */ + pubsub_topicSubscriptionDestroy(topicSub); + } + arrayList_destroy(topicSubList); + } + + celixThreadMutex_unlock(&admin->usedSerializersLock); + celixThreadMutex_unlock(&admin->serializerListLock); + + printf("PSA_UDP_MC: %s serializer removed\n",serType); + + + return CELIX_SUCCESS; +} + +celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score){ + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&admin->serializerListLock); + status = pubsub_admin_match(endpoint->topic_props,PUBSUB_ADMIN_TYPE,admin->serializerList,score); + celixThreadMutex_unlock(&admin->serializerListLock); + + return status; +} + +/* This one recall the same logic as in the match function */ +static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc){ + + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&admin->serializerListLock); + status = pubsub_admin_get_best_serializer(ep->topic_props, admin->serializerList, serSvc); + celixThreadMutex_unlock(&admin->serializerListLock); + + return status; + +} + +static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication){ + + celixThreadMutex_lock(&admin->usedSerializersLock); + + hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer; + array_list_pt list = (array_list_pt)hashMap_get(map,serializer); + if(list==NULL){ + arrayList_create(&list); + hashMap_put(map,serializer,list); + } + arrayList_add(list,topicPubSub); + + celixThreadMutex_unlock(&admin->usedSerializersLock); + +} + +static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication){ + + celixThreadMutex_lock(&admin->usedSerializersLock); + + hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer; + hash_map_iterator_pt iter = hashMapIterator_create(map); + while(hashMapIterator_hasNext(iter)){ + array_list_pt list = (array_list_pt)hashMapIterator_nextValue(iter); + if(arrayList_removeElement(list, topicPubSub)){ //Found it! + break; + } + } + hashMapIterator_destroy(iter); + + celixThreadMutex_unlock(&admin->usedSerializersLock); + +}