Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 73C8A200D0F for ; Fri, 29 Sep 2017 15:34:21 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 71FB51609C5; Fri, 29 Sep 2017 13:34:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7653E1609D1 for ; Fri, 29 Sep 2017 15:34:19 +0200 (CEST) Received: (qmail 54793 invoked by uid 500); 29 Sep 2017 13:34:18 -0000 Mailing-List: contact commits-help@celix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@celix.apache.org Delivered-To: mailing list commits@celix.apache.org Received: (qmail 54779 invoked by uid 99); 29 Sep 2017 13:34:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Sep 2017 13:34:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BE662F5AD9; Fri, 29 Sep 2017 13:34:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gricciardi@apache.org To: commits@celix.apache.org Date: Fri, 29 Sep 2017 13:34:19 -0000 Message-Id: <7510df484c83411384207fa0eb8b6bf2@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [03/11] celix git commit: Refactored serializers management archived-at: Fri, 29 Sep 2017 13:34:21 -0000 http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c index e919c9c..9316506 100644 --- a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c +++ b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c @@ -57,7 +57,7 @@ #include "service_factory.h" #include "topic_subscription.h" -#include "pubsub_publish_service_private.h" +#include "topic_publication.h" #include "pubsub_endpoint.h" #include "pubsub_utils.h" #include "subscriber.h" @@ -66,17 +66,20 @@ static const char *DEFAULT_IP = "127.0.0.1"; -static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip); +static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip); static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); -static celix_status_t pubsubAdmin_match(pubsub_admin_pt admin, pubsub_endpoint_pt psEP, double* score); + +static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc); +static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication); +static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication); celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin) { celix_status_t status = CELIX_SUCCESS; #ifdef BUILD_WITH_ZMQ_SECURITY if (!zsys_has_curve()){ - printf("PSA: zeromq curve unsupported\n"); + printf("PSA_ZMQ: zeromq curve unsupported\n"); return CELIX_SERVICE_EXCEPTION; } #endif @@ -95,14 +98,19 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad (*admin)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); (*admin)->pendingSubscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); (*admin)->externalPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + (*admin)->topicSubscriptionsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL); + (*admin)->topicPublicationsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL); + arrayList_create(&((*admin)->noSerializerSubscriptions)); + arrayList_create(&((*admin)->noSerializerPublications)); + arrayList_create(&((*admin)->serializerList)); celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL); celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL); + celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, NULL); celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL); - - celixThreadMutexAttr_create(&(*admin)->pendingSubscriptionsAttr); - celixThreadMutexAttr_settype(&(*admin)->pendingSubscriptionsAttr, CELIX_THREAD_MUTEX_RECURSIVE); - celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, &(*admin)->pendingSubscriptionsAttr); + celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, NULL); + celixThreadMutex_create(&(*admin)->serializerListLock, NULL); + celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL); if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) { logHelper_start((*admin)->loghelper); @@ -115,8 +123,8 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad const char *interface = NULL; bundleContext_getProperty(context, PSA_ITF, &interface); - if (pubsubAdmin_getIpAddress(interface, &detectedIp) != CELIX_SUCCESS) { - logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA: Could not retrieve IP adress for interface %s", interface); + if (pubsubAdmin_getIpAdress(interface, &detectedIp) != CELIX_SUCCESS) { + logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: Could not retrieve IP adress for interface %s", interface); } ip = detectedIp; @@ -124,11 +132,11 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad #endif if (ip != NULL) { - logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA: Using %s for service annunciation", ip); + logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_ZMQ: Using %s for service annunciation", ip); (*admin)->ipAddress = strdup(ip); } else { - logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA: No IP address for service annunciation set. Using %s", DEFAULT_IP); + logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: No IP address for service annunciation set. Using %s", DEFAULT_IP); (*admin)->ipAddress = strdup(DEFAULT_IP); } @@ -136,24 +144,24 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad free(detectedIp); } - const char* basePortStr = NULL; - const char* maxPortStr = NULL; - char* endptrBase = NULL; - char* endptrMax = NULL; - bundleContext_getPropertyWithDefault(context, PSA_ZMQ_BASE_PORT, "PSA_ZMQ_DEFAULT_BASE_PORT", &basePortStr); - bundleContext_getPropertyWithDefault(context, PSA_ZMQ_MAX_PORT, "PSA_ZMQ_DEFAULT_MAX_PORT", &maxPortStr); - (*admin)->basePort = strtol(basePortStr, &endptrBase, 10); - (*admin)->maxPort = strtol(maxPortStr, &endptrMax, 10); - if (*endptrBase != '\0') { - (*admin)->basePort = PSA_ZMQ_DEFAULT_BASE_PORT; - } - if (*endptrMax != '\0') { - (*admin)->maxPort = PSA_ZMQ_DEFAULT_MAX_PORT; - } - - printf("PSA Using base port %u to max port %u\n", (*admin)->basePort, (*admin)->maxPort); - - // Disable Signal Handling by CZMQ + const char* basePortStr = NULL; + const char* maxPortStr = NULL; + char* endptrBase = NULL; + char* endptrMax = NULL; + bundleContext_getPropertyWithDefault(context, PSA_ZMQ_BASE_PORT, "PSA_ZMQ_DEFAULT_BASE_PORT", &basePortStr); + bundleContext_getPropertyWithDefault(context, PSA_ZMQ_MAX_PORT, "PSA_ZMQ_DEFAULT_MAX_PORT", &maxPortStr); + (*admin)->basePort = strtol(basePortStr, &endptrBase, 10); + (*admin)->maxPort = strtol(maxPortStr, &endptrMax, 10); + if (*endptrBase != '\0') { + (*admin)->basePort = PSA_ZMQ_DEFAULT_BASE_PORT; + } + if (*endptrMax != '\0') { + (*admin)->maxPort = PSA_ZMQ_DEFAULT_MAX_PORT; + } + + printf("PSA Using base port %u to max port %u\n", (*admin)->basePort, (*admin)->maxPort); + + // Disable Signal Handling by CZMQ setenv("ZSYS_SIGHANDLER", "false", true); const char *nrZmqThreads = NULL; @@ -164,8 +172,8 @@ celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *ad unsigned int nrThreads = strtoul(nrZmqThreads, &endPtr, 10); if(endPtr != nrZmqThreads && nrThreads > 0 && nrThreads < 50) { zsys_set_io_threads(nrThreads); - logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA: Using %d threads for ZMQ", nrThreads); - printf("PSA: Using %d threads for ZMQ\n", nrThreads); + logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_ZMQ: Using %d threads for ZMQ", nrThreads); + printf("PSA_ZMQ: Using %d threads for ZMQ\n", nrThreads); } } @@ -227,8 +235,38 @@ celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin) hashMap_destroy(admin->externalPublications,false,false); celixThreadMutex_unlock(&admin->externalPublicationsLock); + celixThreadMutex_lock(&admin->serializerListLock); + arrayList_destroy(admin->serializerList); + celixThreadMutex_unlock(&admin->serializerListLock); + + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_destroy(admin->noSerializerSubscriptions); + arrayList_destroy(admin->noSerializerPublications); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + + + celixThreadMutex_lock(&admin->usedSerializersLock); + + iter = hashMapIterator_create(admin->topicSubscriptionsPerSerializer); + while(hashMapIterator_hasNext(iter)){ + arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter)); + } + hashMapIterator_destroy(iter); + hashMap_destroy(admin->topicSubscriptionsPerSerializer,false,false); + + iter = hashMapIterator_create(admin->topicPublicationsPerSerializer); + while(hashMapIterator_hasNext(iter)){ + arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter)); + } + hashMapIterator_destroy(iter); + hashMap_destroy(admin->topicPublicationsPerSerializer,false,false); + + celixThreadMutex_unlock(&admin->usedSerializersLock); + + celixThreadMutex_destroy(&admin->usedSerializersLock); + celixThreadMutex_destroy(&admin->noSerializerPendingsLock); + celixThreadMutex_destroy(&admin->serializerListLock); celixThreadMutex_destroy(&admin->pendingSubscriptionsLock); - celixThreadMutexAttr_destroy(&admin->pendingSubscriptionsAttr); celixThreadMutex_destroy(&admin->subscriptionsLock); celixThreadMutex_destroy(&admin->localPublicationsLock); celixThreadMutex_destroy(&admin->externalPublicationsLock); @@ -258,8 +296,16 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu if(any_sub==NULL){ int i; - - status += pubsub_topicSubscriptionCreate(admin->bundle_context, subEP, admin->serializerSvc, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, &any_sub); + pubsub_serializer_service_t *best_serializer = NULL; + if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){ + status = pubsub_topicSubscriptionCreate(admin->bundle_context, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, &any_sub); + } + else{ + printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",subEP->topic); + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_add(admin->noSerializerSubscriptions,subEP); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + } if (status == CELIX_SUCCESS){ @@ -278,6 +324,7 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint); } } + arrayList_destroy(topic_publishers); } } hashMapIterator_destroy(lp_iter); @@ -309,6 +356,7 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu if (status == CELIX_SUCCESS){ hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub); + connectTopicPubSubToSerializer(admin, best_serializer, any_sub, false); } } @@ -321,18 +369,17 @@ static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsu celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ celix_status_t status = CELIX_SUCCESS; - printf("PSA: Received subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope, subEP->topic); + printf("PSA_ZMQ: Received subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope, subEP->topic); if(strcmp(subEP->topic,PUBSUB_ANY_SUB_TOPIC)==0){ return pubsubAdmin_addAnySubscription(admin,subEP); } - celixThreadMutex_lock(&admin->subscriptionsLock); /* Check if we already know some publisher about this topic, otherwise let's put the subscription in the pending hashmap */ celixThreadMutex_lock(&admin->localPublicationsLock); celixThreadMutex_lock(&admin->externalPublicationsLock); - char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic); + char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic); service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic); array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic); @@ -341,12 +388,22 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint celixThreadMutex_lock(&admin->pendingSubscriptionsLock); pubsubAdmin_addSubscriptionToPendingList(admin,subEP); celixThreadMutex_unlock(&admin->pendingSubscriptionsLock); - } else { + } + else{ int i; topic_subscription_pt subscription = hashMap_get(admin->subscriptions, scope_topic); if(subscription == NULL) { - status += pubsub_topicSubscriptionCreate(admin->bundle_context, subEP, admin->serializerSvc, subEP->scope, subEP->topic, &subscription); + pubsub_serializer_service_t *best_serializer = NULL; + if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){ + status += pubsub_topicSubscriptionCreate(admin->bundle_context,subEP->scope, subEP->topic, best_serializer, &subscription); + } + else{ + printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",subEP->topic); + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_add(admin->noSerializerSubscriptions,subEP); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + } if (status==CELIX_SUCCESS){ @@ -362,6 +419,7 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint); } } + arrayList_destroy(topic_publishers); } } @@ -383,7 +441,10 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint } if(status==CELIX_SUCCESS){ + celixThreadMutex_lock(&admin->subscriptionsLock); hashMap_put(admin->subscriptions,strdup(scope_topic),subscription); + celixThreadMutex_unlock(&admin->subscriptionsLock); + connectTopicPubSubToSerializer(admin, best_serializer, subscription, false); } } @@ -392,10 +453,9 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint } } - free(scope_topic); + free(scope_topic); celixThreadMutex_unlock(&admin->externalPublicationsLock); celixThreadMutex_unlock(&admin->localPublicationsLock); - celixThreadMutex_unlock(&admin->subscriptionsLock); return status; @@ -404,11 +464,12 @@ celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ celix_status_t status = CELIX_SUCCESS; - printf("PSA: Removing subscription [FWUUID=%s bundleID=%ld topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->topic); + printf("PSA_ZMQ: Removing subscription [FWUUID=%s bundleID=%ld topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->topic); celixThreadMutex_lock(&admin->subscriptionsLock); - topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,subEP->topic); + char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic); + topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic); if(sub!=NULL){ pubsub_topicDecreaseNrSubscribers(sub); @@ -417,9 +478,16 @@ celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpo } } else{ - status = CELIX_ILLEGAL_STATE; + /* Maybe the endpoint was pending */ + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + if(!arrayList_removeElement(admin->noSerializerSubscriptions, subEP)){ + status = CELIX_ILLEGAL_STATE; + } + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); } + free(scope_topic); + celixThreadMutex_unlock(&admin->subscriptionsLock); return status; @@ -427,114 +495,127 @@ celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpo } celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP) { - celix_status_t status = CELIX_SUCCESS; - - printf("PSA: Received publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n", pubEP->frameworkUUID, pubEP->serviceID, pubEP->scope, pubEP->topic); - - const char* fwUUID = NULL; - - char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic); - bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID); - if (fwUUID == NULL) { - printf("PSA: Cannot retrieve fwUUID.\n"); - return CELIX_INVALID_BUNDLE_CONTEXT; - } - - if ((strcmp(pubEP->frameworkUUID, fwUUID) == 0) && (pubEP->endpoint == NULL)) { - - celixThreadMutex_lock(&admin->localPublicationsLock); - - service_factory_pt factory = (service_factory_pt) hashMap_get(admin->localPublications, scope_topic); - - if (factory == NULL) { - topic_publication_pt pub = NULL; - status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, admin->ipAddress, admin->basePort, admin->maxPort, &pub); - pubsub_topicPublicationSetSerializer(pub, admin->serializerSvc); //TODO serializer back to constructor. This is certainly when - //TODO admin are created for every available serializer - if (status == CELIX_SUCCESS) { - status = pubsub_topicPublicationStart(admin->bundle_context, pub, &factory); - if (status == CELIX_SUCCESS && factory != NULL) { - hashMap_put(admin->localPublications, strdup(scope_topic), factory); - } - } else { - printf("PSA: Cannot create a topicPublication for scope=%s, topic=%s (bundle %ld).\n", pubEP->scope, pubEP->topic, pubEP->serviceID); - } - } else { - //just add the new EP to the list - topic_publication_pt pub = (topic_publication_pt) factory->handle; - pubsub_topicPublicationAddPublisherEP(pub, pubEP); - } - - celixThreadMutex_unlock(&admin->localPublicationsLock); - } else { - - celixThreadMutex_lock(&admin->externalPublicationsLock); - array_list_pt ext_pub_list = (array_list_pt) hashMap_get(admin->externalPublications, scope_topic); - if (ext_pub_list == NULL) { - arrayList_create(&ext_pub_list); - hashMap_put(admin->externalPublications, strdup(scope_topic), ext_pub_list); - } - - arrayList_add(ext_pub_list, pubEP); - - celixThreadMutex_unlock(&admin->externalPublicationsLock); - } - - /* Re-evaluate the pending subscriptions */ - celixThreadMutex_lock(&admin->pendingSubscriptionsLock); - - hash_map_entry_pt pendingSub = hashMap_getEntry(admin->pendingSubscriptions, scope_topic); - if (pendingSub != NULL) { //There were pending subscription for the just published topic. Let's connect them. - char* topic = (char*) hashMapEntry_getKey(pendingSub); - array_list_pt pendingSubList = (array_list_pt) hashMapEntry_getValue(pendingSub); - int i; - for (i = 0; i < arrayList_size(pendingSubList); i++) { - pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) arrayList_get(pendingSubList, i); - pubsubAdmin_addSubscription(admin, subEP); - } - hashMap_remove(admin->pendingSubscriptions, scope_topic); - arrayList_clear(pendingSubList); - arrayList_destroy(pendingSubList); - free(topic); - } - - celixThreadMutex_unlock(&admin->pendingSubscriptionsLock); - - /* Connect the new publisher to the subscription for his topic, if there is any */ - celixThreadMutex_lock(&admin->subscriptionsLock); - - topic_subscription_pt sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, scope_topic); - if (sub != NULL && pubEP->endpoint != NULL) { - pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, pubEP->endpoint); - } - - /* And check also for ANY subscription */ - topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC); - if (any_sub != NULL && pubEP->endpoint != NULL) { - pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, pubEP->endpoint); - } - free(scope_topic); - - celixThreadMutex_unlock(&admin->subscriptionsLock); - - return status; + celix_status_t status = CELIX_SUCCESS; + + printf("PSA_ZMQ: Received publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n", pubEP->frameworkUUID, pubEP->serviceID, pubEP->scope, pubEP->topic); + + const char* fwUUID = NULL; + + bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID); + if (fwUUID == NULL) { + printf("PSA_ZMQ: Cannot retrieve fwUUID.\n"); + return CELIX_INVALID_BUNDLE_CONTEXT; + } + + char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic); + + if ((strcmp(pubEP->frameworkUUID, fwUUID) == 0) && (pubEP->endpoint == NULL)) { + + celixThreadMutex_lock(&admin->localPublicationsLock); + + service_factory_pt factory = (service_factory_pt) hashMap_get(admin->localPublications, scope_topic); + + if (factory == NULL) { + topic_publication_pt pub = NULL; + pubsub_serializer_service_t *best_serializer = NULL; + if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer)) == CELIX_SUCCESS){ + status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, best_serializer, admin->ipAddress, admin->basePort, admin->maxPort, &pub); + } + else{ + printf("PSA_ZMQ: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n", pubEP->topic); + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_add(admin->noSerializerPublications,pubEP); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + } + + if (status == CELIX_SUCCESS) { + status = pubsub_topicPublicationStart(admin->bundle_context, pub, &factory); + if (status == CELIX_SUCCESS && factory != NULL) { + hashMap_put(admin->localPublications, strdup(scope_topic), factory); + connectTopicPubSubToSerializer(admin, best_serializer, pub, true); + } + } else { + printf("PSA_ZMQ: Cannot create a topicPublication for scope=%s, topic=%s (bundle %ld).\n", pubEP->scope, pubEP->topic, pubEP->serviceID); + } + } else { + //just add the new EP to the list + topic_publication_pt pub = (topic_publication_pt) factory->handle; + pubsub_topicPublicationAddPublisherEP(pub, pubEP); + } + + celixThreadMutex_unlock(&admin->localPublicationsLock); + } + else{ + + celixThreadMutex_lock(&admin->externalPublicationsLock); + array_list_pt ext_pub_list = (array_list_pt) hashMap_get(admin->externalPublications, scope_topic); + if (ext_pub_list == NULL) { + arrayList_create(&ext_pub_list); + hashMap_put(admin->externalPublications, strdup(scope_topic), ext_pub_list); + } + + arrayList_add(ext_pub_list, pubEP); + + celixThreadMutex_unlock(&admin->externalPublicationsLock); + } + + /* Re-evaluate the pending subscriptions */ + celixThreadMutex_lock(&admin->pendingSubscriptionsLock); + + hash_map_entry_pt pendingSub = hashMap_getEntry(admin->pendingSubscriptions, scope_topic); + if (pendingSub != NULL) { //There were pending subscription for the just published topic. Let's connect them. + char* topic = (char*) hashMapEntry_getKey(pendingSub); + array_list_pt pendingSubList = (array_list_pt) hashMapEntry_getValue(pendingSub); + int i; + for (i = 0; i < arrayList_size(pendingSubList); i++) { + pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) arrayList_get(pendingSubList, i); + pubsubAdmin_addSubscription(admin, subEP); + } + hashMap_remove(admin->pendingSubscriptions, scope_topic); + arrayList_clear(pendingSubList); + arrayList_destroy(pendingSubList); + free(topic); + } + + celixThreadMutex_unlock(&admin->pendingSubscriptionsLock); + + /* Connect the new publisher to the subscription for his topic, if there is any */ + celixThreadMutex_lock(&admin->subscriptionsLock); + + topic_subscription_pt sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, scope_topic); + if (sub != NULL && pubEP->endpoint != NULL) { + pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, pubEP->endpoint); + } + + /* And check also for ANY subscription */ + topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC); + if (any_sub != NULL && pubEP->endpoint != NULL) { + pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, pubEP->endpoint); + } + + free(scope_topic); + + celixThreadMutex_unlock(&admin->subscriptionsLock); + + return status; } celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){ celix_status_t status = CELIX_SUCCESS; - int count = 0; + int count = 0; - printf("PSA: Removing publication [FWUUID=%s bundleID=%ld topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->topic); + printf("PSA_ZMQ: Removing publication [FWUUID=%s bundleID=%ld topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->topic); const char* fwUUID = NULL; bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); if(fwUUID==NULL){ - printf("PSA: Cannot retrieve fwUUID.\n"); + printf("PSA_ZMQ: Cannot retrieve fwUUID.\n"); return CELIX_INVALID_BUNDLE_CONTEXT; } char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic); + if(strcmp(pubEP->frameworkUUID,fwUUID)==0){ celixThreadMutex_lock(&admin->localPublicationsLock); @@ -545,7 +626,12 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi pubsub_topicPublicationRemovePublisherEP(pub,pubEP); } else{ - status = CELIX_ILLEGAL_STATE; + /* Maybe the endpoint was pending */ + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + if(!arrayList_removeElement(admin->noSerializerPublications, pubEP)){ + status = CELIX_ILLEGAL_STATE; + } + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); } celixThreadMutex_unlock(&admin->localPublicationsLock); @@ -561,20 +647,18 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi pubsub_endpoint_pt p = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i); found = pubsubEndpoint_equals(pubEP,p); if (found){ - found = true; arrayList_remove(ext_pub_list,i); } } // Check if there are more publishers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic) - int ext_pub_list_size = arrayList_size(ext_pub_list); - for(i=0; iendpoint,p->endpoint) == 0) { count++; } } - if(ext_pub_list_size == 0){ + if(arrayList_size(ext_pub_list)==0){ hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic); char* topic = (char*)hashMapEntry_getKey(entry); array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry); @@ -582,7 +666,6 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi arrayList_destroy(list); free(topic); } - } celixThreadMutex_unlock(&admin->externalPublicationsLock); @@ -599,8 +682,9 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi /* And check also for ANY subscription */ topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC); if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){ - pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,pubEP->endpoint); + pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint); } + free(scope_topic); celixThreadMutex_unlock(&admin->subscriptionsLock); @@ -611,20 +695,21 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin, char *scope, char* topic){ celix_status_t status = CELIX_SUCCESS; - printf("PSA: Closing all publications\n"); + printf("PSA_ZMQ: Closing all publications\n"); celixThreadMutex_lock(&admin->localPublicationsLock); char *scope_topic = createScopeTopicKey(scope, topic); hash_map_entry_pt pubsvc_entry = (hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic); if(pubsvc_entry!=NULL){ - char* topic = (char*)hashMapEntry_getKey(pubsvc_entry); + char* key = (char*)hashMapEntry_getKey(pubsvc_entry); service_factory_pt factory= (service_factory_pt)hashMapEntry_getValue(pubsvc_entry); topic_publication_pt pub = (topic_publication_pt)factory->handle; status += pubsub_topicPublicationStop(pub); status += pubsub_topicPublicationDestroy(pub); + disconnectTopicPubSubFromSerializer(admin, pub, true); hashMap_remove(admin->localPublications,scope_topic); - free(topic); + free(key); free(factory); } free(scope_topic); @@ -637,7 +722,7 @@ celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin, char *sco celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope,char* topic){ celix_status_t status = CELIX_SUCCESS; - printf("PSA: Closing all subscriptions\n"); + printf("PSA_ZMQ: Closing all subscriptions\n"); celixThreadMutex_lock(&admin->subscriptionsLock); char *scope_topic = createScopeTopicKey(scope, topic); @@ -649,6 +734,7 @@ celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* sco status += pubsub_topicSubscriptionStop(ts); status += pubsub_topicSubscriptionDestroy(ts); + disconnectTopicPubSubFromSerializer(admin, ts, false); hashMap_remove(admin->subscriptions,scope_topic); free(topic); @@ -660,95 +746,9 @@ celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* sco } -celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score){ - celix_status_t status = CELIX_SUCCESS; - status = pubsubAdmin_match(admin, pubEP, score); - return status; -} - -celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score){ - celix_status_t status = CELIX_SUCCESS; - status = pubsubAdmin_match(admin, subEP, score); - return status; -} - -celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc) { - celix_status_t status = CELIX_SUCCESS; - admin->serializerSvc = serializerSvc; - - /* Add serializer to all topic_publication_pt */ - celixThreadMutex_lock(&admin->localPublicationsLock); - hash_map_iterator_pt lp_iter = hashMapIterator_create(admin->localPublications); - while(hashMapIterator_hasNext(lp_iter)){ - service_factory_pt factory = (service_factory_pt) hashMapIterator_nextValue(lp_iter); - topic_publication_pt topic_pub = (topic_publication_pt) factory->handle; - pubsub_topicPublicationSetSerializer(topic_pub, admin->serializerSvc); - } - hashMapIterator_destroy(lp_iter); - celixThreadMutex_unlock(&admin->localPublicationsLock); - - /* Add serializer to all topic_subscription_pt */ - celixThreadMutex_lock(&admin->subscriptionsLock); - hash_map_iterator_pt subs_iter = hashMapIterator_create(admin->subscriptions); - while(hashMapIterator_hasNext(subs_iter)){ - topic_subscription_pt topic_sub = (topic_subscription_pt) hashMapIterator_nextValue(subs_iter); - pubsub_topicSubscriptionSetSerializer(topic_sub, admin->serializerSvc); - } - hashMapIterator_destroy(subs_iter); - celixThreadMutex_unlock(&admin->subscriptionsLock); - - return status; -} - -celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc){ - celix_status_t status = CELIX_SUCCESS; - admin->serializerSvc = NULL; - - /* Remove serializer from all topic_publication_pt */ - celixThreadMutex_lock(&admin->localPublicationsLock); - hash_map_iterator_pt lp_iter = hashMapIterator_create(admin->localPublications); - while(hashMapIterator_hasNext(lp_iter)){ - service_factory_pt factory = (service_factory_pt) hashMapIterator_nextValue(lp_iter); - topic_publication_pt topic_pub = (topic_publication_pt) factory->handle; - pubsub_topicPublicationRemoveSerializer(topic_pub, admin->serializerSvc); - } - hashMapIterator_destroy(lp_iter); - celixThreadMutex_unlock(&admin->localPublicationsLock); - - /* Remove serializer from all topic_subscription_pt */ - celixThreadMutex_lock(&admin->subscriptionsLock); - hash_map_iterator_pt subs_iter = hashMapIterator_create(admin->subscriptions); - while(hashMapIterator_hasNext(subs_iter)){ - topic_subscription_pt topic_sub = (topic_subscription_pt) hashMapIterator_nextValue(subs_iter); - pubsub_topicSubscriptionRemoveSerializer(topic_sub, admin->serializerSvc); - } - hashMapIterator_destroy(subs_iter); - celixThreadMutex_unlock(&admin->subscriptionsLock); - - return status; -} - -static celix_status_t pubsubAdmin_match(pubsub_admin_pt admin, pubsub_endpoint_pt psEP, double* score){ - celix_status_t status = CELIX_SUCCESS; - - char topic_psa_prop[1024]; - snprintf(topic_psa_prop, 1024, "%s.psa", psEP->topic); - - const char* psa_to_use = NULL; - bundleContext_getPropertyWithDefault(admin->bundle_context, topic_psa_prop, PSA_DEFAULT, &psa_to_use); - - *score = 0; - if (strcmp(psa_to_use, "zmq") == 0){ - *score += 100; - }else{ - *score += 1; - } - - return status; -} #ifndef ANDROID -static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip) { +static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip) { celix_status_t status = CELIX_BUNDLE_EXCEPTION; struct ifaddrs *ifaddr, *ifa; @@ -792,3 +792,237 @@ static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt a free(scope_topic); return status; } + +celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt reference, void * service){ + /* Assumption: serializers are all available at startup. + * If a new (possibly better) serializer is installed and started, already created topic_publications/subscriptions will not be destroyed and recreated */ + + celix_status_t status = CELIX_SUCCESS; + int i=0; + + const char *serType = NULL; + serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType); + if(serType == NULL){ + printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference); + return CELIX_SERVICE_EXCEPTION; + } + + pubsub_admin_pt admin = (pubsub_admin_pt)handle; + celixThreadMutex_lock(&admin->serializerListLock); + arrayList_add(admin->serializerList, reference); + celixThreadMutex_unlock(&admin->serializerListLock); + + /* Now let's re-evaluate the pending */ + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + + for(i=0;inoSerializerSubscriptions);i++){ + pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i); + pubsub_serializer_service_t *best_serializer = NULL; + pubsubAdmin_getBestSerializer(admin, ep, &best_serializer); + if(best_serializer != NULL){ /* Finally we have a valid serializer! */ + pubsubAdmin_addSubscription(admin, ep); + } + } + + for(i=0;inoSerializerPublications);i++){ + pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i); + pubsub_serializer_service_t *best_serializer = NULL; + pubsubAdmin_getBestSerializer(admin, ep, &best_serializer); + if(best_serializer != NULL){ /* Finally we have a valid serializer! */ + pubsubAdmin_addPublication(admin, ep); + } + } + + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + + printf("PSA_ZMQ: %s serializer added\n",serType); + + return status; +} + +celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service){ + + pubsub_admin_pt admin = (pubsub_admin_pt)handle; + int i=0, j=0; + const char *serType = NULL; + + serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType); + if(serType == NULL){ + printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference); + return CELIX_SERVICE_EXCEPTION; + } + + celixThreadMutex_lock(&admin->serializerListLock); + celixThreadMutex_lock(&admin->usedSerializersLock); + + + /* Remove the serializer from the list */ + arrayList_removeElement(admin->serializerList, reference); + + /* Now destroy the topicPublications, but first put back the pubsub_endpoints back to the noSerializer pending list */ + array_list_pt topicPubList = (array_list_pt)hashMap_remove(admin->topicPublicationsPerSerializer, service); + if(topicPubList!=NULL){ + for(i=0;iendpoint!=NULL){ + free(pubEP->endpoint); + pubEP->endpoint = NULL; + } + /* Add the orphan endpoint to the noSerializer pending list */ + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_add(admin->noSerializerPublications,pubEP); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + } + arrayList_destroy(pubList); + + /* Cleanup also the localPublications hashmap*/ + celixThreadMutex_lock(&admin->localPublicationsLock); + hash_map_iterator_pt iter = hashMapIterator_create(admin->localPublications); + char *key = NULL; + service_factory_pt factory = NULL; + while(hashMapIterator_hasNext(iter)){ + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + factory = (service_factory_pt)hashMapEntry_getValue(entry); + topic_publication_pt pub = (topic_publication_pt)factory->handle; + if(pub==topicPub){ + key = (char*)hashMapEntry_getKey(entry); + break; + } + } + hashMapIterator_destroy(iter); + if(key!=NULL){ + hashMap_remove(admin->localPublications, key); + free(factory); + free(key); + } + celixThreadMutex_unlock(&admin->localPublicationsLock); + + /* Finally destroy the topicPublication */ + pubsub_topicPublicationDestroy(topicPub); + } + arrayList_destroy(topicPubList); + } + + /* Now destroy the topicSubscriptions, but first put back the pubsub_endpoints back to the noSerializer pending list */ + array_list_pt topicSubList = (array_list_pt)hashMap_remove(admin->topicSubscriptionsPerSerializer, service); + if(topicSubList!=NULL){ + for(i=0;iendpoint!=NULL){ + free(subEP->endpoint); + subEP->endpoint = NULL; + } + /* Add the orphan endpoint to the noSerializer pending list */ + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_add(admin->noSerializerSubscriptions,subEP); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + } + + /* Cleanup also the subscriptions hashmap*/ + celixThreadMutex_lock(&admin->subscriptionsLock); + hash_map_iterator_pt iter = hashMapIterator_create(admin->subscriptions); + char *key = NULL; + while(hashMapIterator_hasNext(iter)){ + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + topic_subscription_pt sub = (topic_subscription_pt)hashMapEntry_getValue(entry); + if(sub==topicSub){ + key = (char*)hashMapEntry_getKey(entry); + break; + } + } + hashMapIterator_destroy(iter); + if(key!=NULL){ + hashMap_remove(admin->subscriptions, key); + free(key); + } + celixThreadMutex_unlock(&admin->subscriptionsLock); + + /* Finally destroy the topicSubscription */ + pubsub_topicSubscriptionDestroy(topicSub); + } + arrayList_destroy(topicSubList); + } + + celixThreadMutex_unlock(&admin->usedSerializersLock); + celixThreadMutex_unlock(&admin->serializerListLock); + + printf("PSA_ZMQ: %s serializer removed\n",serType); + + + return CELIX_SUCCESS; +} + +celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score){ + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&admin->serializerListLock); + status = pubsub_admin_match(endpoint->topic_props,PUBSUB_ADMIN_TYPE,admin->serializerList,score); + celixThreadMutex_unlock(&admin->serializerListLock); + + return status; +} + +/* This one recall the same logic as in the match function */ +static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc){ + + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&admin->serializerListLock); + status = pubsub_admin_get_best_serializer(ep->topic_props, admin->serializerList, serSvc); + celixThreadMutex_unlock(&admin->serializerListLock); + + return status; + +} + +static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication){ + + celixThreadMutex_lock(&admin->usedSerializersLock); + + hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer; + array_list_pt list = (array_list_pt)hashMap_get(map,serializer); + if(list==NULL){ + arrayList_create(&list); + hashMap_put(map,serializer,list); + } + arrayList_add(list,topicPubSub); + + celixThreadMutex_unlock(&admin->usedSerializersLock); + +} + +static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication){ + + celixThreadMutex_lock(&admin->usedSerializersLock); + + hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer; + hash_map_iterator_pt iter = hashMapIterator_create(map); + while(hashMapIterator_hasNext(iter)){ + array_list_pt list = (array_list_pt)hashMapIterator_nextValue(iter); + if(arrayList_removeElement(list, topicPubSub)){ //Found it! + break; + } + } + hashMapIterator_destroy(iter); + + celixThreadMutex_unlock(&admin->usedSerializersLock); + +} http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/private/src/topic_publication.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c index 2eaad97..b741771 100644 --- a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c +++ b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c @@ -7,7 +7,7 @@ *"License"); you may not use this file except in compliance *with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * htPSA_ZMQ_TP://www.apache.org/licenses/LICENSE-2.0 * *Unless required by applicable law or agreed to in writing, *software distributed under the License is distributed on an @@ -24,7 +24,6 @@ * \copyright Apache License, Version 2.0 */ -#include "pubsub_publish_service_private.h" #include /* The following undefs prevent the collision between: * - sys/syslog.h (which is included within czmq) @@ -50,6 +49,10 @@ #include "pubsub_utils.h" #include "publisher.h" +#include "topic_publication.h" + +#include "pubsub_serializer.h" + #ifdef BUILD_WITH_ZMQ_SECURITY #include "zmq_crypto.h" @@ -69,21 +72,21 @@ struct topic_publication { service_registration_pt svcFactoryReg; array_list_pt pub_ep_list; //List hash_map_pt boundServices; // - celix_thread_mutex_t tp_lock; // Protects topic_publication data structure - pubsub_serializer_service_t* serializerSvc; + pubsub_serializer_service_t *serializer; + celix_thread_mutex_t tp_lock; }; typedef struct publish_bundle_bound_service { topic_publication_pt parent; - pubsub_publisher_t pubSvc; + pubsub_publisher_t service; bundle_pt bundle; char *topic; - pubsub_msg_serializer_map_t* map; + hash_map_pt msgTypes; unsigned short getCount; celix_thread_mutex_t mp_lock; //Protects publish_bundle_bound_service data structure bool mp_send_in_progress; array_list_pt mp_parts; -} publish_bundle_bound_service_t; +}* publish_bundle_bound_service_pt; /* Note: correct locking order is * 1. tp_lock @@ -93,27 +96,27 @@ typedef struct publish_bundle_bound_service { * tp_lock and socket_lock are independent. */ -typedef struct pubsub_msg { +typedef struct pubsub_msg{ pubsub_msg_header_pt header; char* payload; int payloadSize; -} pubsub_msg_t; +}* pubsub_msg_pt; static unsigned int rand_range(unsigned int min, unsigned int max); static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service); static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service); -static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle); -static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc); +static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle); +static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc); static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, const void *msg); -static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *msg, int flags); +static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *inMsg, int flags); static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId); static void delay_first_send_for_late_joiners(void); -celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){ +celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){ celix_status_t status = CELIX_SUCCESS; #ifdef BUILD_WITH_ZMQ_SECURITY @@ -128,7 +131,7 @@ celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, p for (i = 0; i < secure_topics_size; i++){ char* top = arrayList_get(secure_topics_list, i); if (strcmp(pubEP->topic, top) == 0){ - printf("TP: Secure topic: '%s'\n", top); + printf("PSA_ZMQ_TP: Secure topic: '%s'\n", top); pubEP->is_secure = true; } free(top); @@ -155,12 +158,12 @@ celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, p //certificate path ".cache/bundle{id}/version0.0/./META-INF/keys/publisher/private/pub_{topic}.key" snprintf(cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/publisher/private/pub_%s.key.enc", keys_bundle_dir, pubEP->topic); free(keys_bundle_dir); - printf("TP: Loading key '%s'\n", cert_path); + printf("PSA_ZMQ_TP: Loading key '%s'\n", cert_path); pub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, cert_path); if (pub_cert == NULL){ - printf("TP: Cannot load key '%s'\n", cert_path); - printf("TP: Topic '%s' NOT SECURED !\n", pubEP->topic); + printf("PSA_ZMQ_TP: Cannot load key '%s'\n", cert_path); + printf("PSA_ZMQ_TP: Topic '%s' NOT SECURED !\n", pubEP->topic); pubEP->is_secure = false; } } @@ -218,7 +221,7 @@ celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, p pub->endpoint = ep; pub->zmq_socket = socket; - pub->serializerSvc = NULL; + pub->serializer = best_serializer; celixThreadMutex_create(&(pub->socket_lock),NULL); @@ -245,13 +248,14 @@ celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){ hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices); while(hashMapIterator_hasNext(iter)){ - publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(iter); + publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter); pubsub_destroyPublishBundleBoundService(bound); } hashMapIterator_destroy(iter); hashMap_destroy(pub->boundServices,false,false); pub->svcFactoryReg = NULL; + pub->serializer = NULL; #ifdef BUILD_WITH_ZMQ_SECURITY zcert_destroy(&(pub->zmq_cert)); #endif @@ -275,7 +279,6 @@ celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,top celix_status_t status = CELIX_SUCCESS; /* Let's register the new service */ - //celixThreadMutex_lock(&(pub->tp_lock)); pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0); @@ -294,30 +297,22 @@ celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,top if(status != CELIX_SUCCESS){ properties_destroy(props); - printf("PSA: Cannot register ServiceFactory for topic %s (bundle %ld).\n",pubEP->topic,pubEP->serviceID); + printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot register ServiceFactory for topic %s (bundle %ld).\n",pubEP->topic,pubEP->serviceID); } else{ *svcFactory = factory; } } else{ - printf("PSA: Cannot find pubsub_endpoint after adding it...Should never happen!\n"); + printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot find pubsub_endpoint after adding it...Should never happen!\n"); status = CELIX_SERVICE_EXCEPTION; } - //celixThreadMutex_unlock(&(pub->tp_lock)); - return status; } celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){ - celix_status_t status = CELIX_SUCCESS; - - //celixThreadMutex_lock(&(pub->tp_lock)); - status = serviceRegistration_unregister(pub->svcFactoryReg); - //celixThreadMutex_unlock(&(pub->tp_lock)); - - return status; + return serviceRegistration_unregister(pub->svcFactoryReg); } celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){ @@ -345,63 +340,12 @@ celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub return CELIX_SUCCESS; } -celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc){ - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&(pub->tp_lock)); - - //clearing pref serializer - if (pub->serializerSvc != NULL) { - hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); - while (hashMapIterator_hasNext(&iter)) { - publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter); - celixThreadMutex_lock(&bound->mp_lock); - pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map); - celixThreadMutex_unlock(&bound->mp_lock); - bound->map = NULL; - } - } - - pub->serializerSvc = serializerSvc; - if (pub->serializerSvc != NULL) { - hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); - while (hashMapIterator_hasNext(&iter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter); - bundle_pt bundle = hashMapEntry_getKey(entry); - publish_bundle_bound_service_t* bound = hashMapEntry_getValue(entry); - celixThreadMutex_lock(&bound->mp_lock); - pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, &bound->map); - celixThreadMutex_unlock(&bound->mp_lock); - } - } - - celixThreadMutex_unlock(&(pub->tp_lock)); - - return status; -} - -celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* svc){ - celix_status_t status = CELIX_SUCCESS; +array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){ + array_list_pt list = NULL; celixThreadMutex_lock(&(pub->tp_lock)); - - if (pub->serializerSvc == svc) { - hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); - while (hashMapIterator_hasNext(&iter)) { - publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter); - celixThreadMutex_lock(&bound->mp_lock); - pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map); - celixThreadMutex_unlock(&bound->mp_lock); - bound->map = NULL; - } - pub->serializerSvc = NULL; - } - + list = arrayList_clone(pub->pub_ep_list); celixThreadMutex_unlock(&(pub->tp_lock)); - return status; -} - -array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){ - return pub->pub_ep_list; + return list; } @@ -412,7 +356,7 @@ static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt celixThreadMutex_lock(&(publish->tp_lock)); - publish_bundle_bound_service_t* bound = hashMap_get(publish->boundServices,bundle); + publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle); if(bound==NULL){ bound = pubsub_createPublishBundleBoundService(publish,bundle); if(bound!=NULL){ @@ -423,9 +367,7 @@ static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bound->getCount++; } - if(bound!=NULL){ - *service = &bound->pubSvc; - } + *service = &bound->service; celixThreadMutex_unlock(&(publish->tp_lock)); @@ -438,7 +380,7 @@ static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_p celixThreadMutex_lock(&(publish->tp_lock)); - publish_bundle_bound_service_t* bound = hashMap_get(publish->boundServices,bundle); + publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle); if(bound!=NULL){ bound->getCount--; @@ -451,7 +393,7 @@ static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_p else{ long bundleId = -1; bundle_getBundleId(bundle,&bundleId); - printf("TP: Unexpected ungetService call for bundle %ld.\n", bundleId); + printf("PSA_ZMQ_TP: Unexpected ungetService call for bundle %ld.\n", bundleId); } /* service should be never used for unget, so let's set the pointer to NULL */ @@ -462,7 +404,7 @@ static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_p return CELIX_SUCCESS; } -static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_t* msg, bool last){ +static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_pt msg, bool last){ bool ret = true; @@ -502,7 +444,7 @@ static bool send_pubsub_mp_msg(zsock_t* zmq_socket, array_list_pt mp_msg_parts){ unsigned int i = 0; unsigned int mp_num = arrayList_size(mp_msg_parts); for(;imp_lock)); + publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle; + + celixThreadMutex_lock(&(bound->mp_lock)); if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & PUBSUB_PUBLISHER_LAST_MSG) && bound->mp_send_in_progress){ //means a real mp_msg - printf("TP: Multipart send already in progress. Cannot process a new one.\n"); + printf("PSA_ZMQ_TP: Multipart send already in progress. Cannot process a new one.\n"); celixThreadMutex_unlock(&(bound->mp_lock)); return -3; - } - - pubsub_msg_serializer_t* msgSer = NULL; - if (bound->map != NULL) { - msgSer = hashMap_get(bound->map->serializers, (void*)(uintptr_t)msgTypeId); } - if (bound->map == NULL) { - printf("TP: Serializer is not set!\n"); - status = 1; - } else if (msgSer == NULL ){ - printf("TP: No msg serializer available for msg type id %d\n", msgTypeId); - hash_map_iterator_t iter = hashMapIterator_construct(bound->map->serializers); - printf("Note supported messages:\n"); - while (hashMapIterator_hasNext(&iter)) { - pubsub_msg_serializer_t *msgSer = hashMapIterator_nextValue(&iter); - printf("\tmsg %s with id %d\n", msgSer->msgName, msgSer->msgId); - } - status = 1; - } + pubsub_msg_serializer_t* msgSer = (pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, (void*)(uintptr_t)msgTypeId); + + if (msgSer!= NULL) { + int major=0, minor=0; - int major=0, minor=0; - if (status == 0 && msgSer != NULL) { pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header)); strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1); msg_hdr->type = msgTypeId; + if (msgSer->msgVersion != NULL){ version_getMajor(msgSer->msgVersion, &major); version_getMinor(msgSer->msgVersion, &minor); @@ -556,23 +487,24 @@ static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTy msg_hdr->minor = minor; } - char* serializedOutput = NULL; + void *serializedOutput = NULL; size_t serializedOutputLen = 0; - msgSer->serialize(msgSer->handle, msg, &serializedOutput, &serializedOutputLen); - pubsub_msg_t* msg = calloc(1,sizeof(struct pubsub_msg)); + msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen); + + pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg)); msg->header = msg_hdr; - msg->payload = (char *) serializedOutput; + msg->payload = (char*)serializedOutput; msg->payloadSize = serializedOutputLen; bool snd = true; - switch (flags) { + switch(flags){ case PUBSUB_PUBLISHER_FIRST_MSG: bound->mp_send_in_progress = true; arrayList_add(bound->mp_parts,msg); break; case PUBSUB_PUBLISHER_PART_MSG: if(!bound->mp_send_in_progress){ - printf("TP: ERROR: received msg part without the first part.\n"); + printf("PSA_ZMQ_TP: ERROR: received msg part without the first part.\n"); status = -4; } else{ @@ -581,72 +513,53 @@ static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTy break; case PUBSUB_PUBLISHER_LAST_MSG: if(!bound->mp_send_in_progress){ - printf("TP: ERROR: received end msg without the first part.\n"); + printf("PSA_ZMQ_TP: ERROR: received end msg without the first part.\n"); status = -4; } else{ arrayList_add(bound->mp_parts,msg); - celixThreadMutex_lock(&(bound->parent->socket_lock)); + celixThreadMutex_lock(&(bound->parent->tp_lock)); snd = send_pubsub_mp_msg(bound->parent->zmq_socket,bound->mp_parts); bound->mp_send_in_progress = false; - celixThreadMutex_unlock(&(bound->parent->socket_lock)); + celixThreadMutex_unlock(&(bound->parent->tp_lock)); } break; case PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG: //Normal send case - celixThreadMutex_lock(&(bound->parent->socket_lock)); + celixThreadMutex_lock(&(bound->parent->tp_lock)); snd = send_pubsub_msg(bound->parent->zmq_socket,msg,true); - celixThreadMutex_unlock(&(bound->parent->socket_lock)); + celixThreadMutex_unlock(&(bound->parent->tp_lock)); break; default: - printf("TP: ERROR: Invalid MP flags combination\n"); + printf("PSA_ZMQ_TP: ERROR: Invalid MP flags combination\n"); status = -4; break; } - /* Free msg in case we got into a bad branch */ if(status==-4){ free(msg); } if(!snd){ - printf("TP: Failed to send %s message %u.\n",flags == (PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG) ? "single" : "multipart", msgTypeId); + printf("PSA_ZMQ_TP: Failed to send %s message %u.\n",flags == (PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG) ? "single" : "multipart", msgTypeId); } } else { - printf("TP: Message %u not supported.\n",msgTypeId); + printf("PSA_ZMQ_TP: No msg serializer available for msg type id %d\n", msgTypeId); status=-1; } celixThreadMutex_unlock(&(bound->mp_lock)); + return status; -} -static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* out){ - publish_bundle_bound_service_t* bound = handle; - unsigned int msgTypeId = 0; - - celixThreadMutex_lock(&bound->mp_lock); - if (bound->map != NULL) { - hash_map_iterator_t iter = hashMapIterator_construct(bound->map->serializers); - while (hashMapIterator_hasNext(&iter)) { - pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter); - if (strncmp(msgType, msgSer->msgName, 1024*1024) == 0) { - msgTypeId = msgSer->msgId; - break; - } - } - } - celixThreadMutex_unlock(&bound->mp_lock); +} - if (msgTypeId != 0) { - *out = msgTypeId; - return 0; - } else { - printf("TP: Cannot find msg type id for msg type %s\n", msgType); - return 1; - } +static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){ + *msgTypeId = utils_stringHash(msgType); + return 0; } + static unsigned int rand_range(unsigned int min, unsigned int max){ double scaled = (double)(((double)rand())/((double)RAND_MAX)); @@ -654,10 +567,11 @@ static unsigned int rand_range(unsigned int min, unsigned int max){ } -static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){ +static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){ + //PRECOND lock on tp->lock - publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound)); + publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound)); if (bound != NULL) { @@ -667,42 +581,41 @@ static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(to bound->mp_send_in_progress = false; celixThreadMutex_create(&bound->mp_lock,NULL); - if (tp->serializerSvc != NULL) { - tp->serializerSvc->createSerializerMap(tp->serializerSvc->handle, bundle, &bound->map); + if(tp->serializer != NULL){ + tp->serializer->createSerializerMap(tp->serializer->handle,bundle,&bound->msgTypes); } + arrayList_create(&bound->mp_parts); pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0); bound->topic=strdup(pubEP->topic); - bound->pubSvc.handle = bound; - bound->pubSvc.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID; - bound->pubSvc.send = pubsub_topicPublicationSend; - bound->pubSvc.sendMultipart = pubsub_topicPublicationSendMultipart; - } - else - { - free(bound); - return NULL; + bound->service.handle = bound; + bound->service.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID; + bound->service.send = pubsub_topicPublicationSend; + bound->service.sendMultipart = pubsub_topicPublicationSendMultipart; + } return bound; } -static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc){ - //PRECOND lock on publish->tp_lock +static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){ + + //PRECOND lock on tp->lock + celixThreadMutex_lock(&boundSvc->mp_lock); - if (boundSvc->map != NULL && boundSvc->parent->serializerSvc != NULL) { - boundSvc->parent->serializerSvc->destroySerializerMap(boundSvc->parent->serializerSvc->handle, boundSvc->map); - boundSvc->map = NULL; + + if(boundSvc->parent->serializer != NULL && boundSvc->msgTypes != NULL){ + boundSvc->parent->serializer->destroySerializerMap(boundSvc->parent->serializer->handle, boundSvc->msgTypes); } - if (boundSvc->mp_parts!=NULL) { + if(boundSvc->mp_parts!=NULL){ arrayList_destroy(boundSvc->mp_parts); } - if (boundSvc->topic!=NULL) { + if(boundSvc->topic!=NULL){ free(boundSvc->topic); } @@ -718,7 +631,7 @@ static void delay_first_send_for_late_joiners(){ static bool firstSend = true; if(firstSend){ - printf("TP: Delaying first send for late joiners...\n"); + printf("PSA_ZMQ_TP: Delaying first send for late joiners...\n"); sleep(FIRST_SEND_DELAY); firstSend = false; }