Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4D2B5200C13 for ; Mon, 6 Feb 2017 19:34:18 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 4BC88160B62; Mon, 6 Feb 2017 18:34:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 086E2160B64 for ; Mon, 6 Feb 2017 19:34:15 +0100 (CET) Received: (qmail 59713 invoked by uid 500); 6 Feb 2017 18:34:15 -0000 Mailing-List: contact commits-help@celix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@celix.apache.org Delivered-To: mailing list commits@celix.apache.org Received: (qmail 59269 invoked by uid 99); 6 Feb 2017 18:34:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Feb 2017 18:34:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 27758E04E2; Mon, 6 Feb 2017 18:34:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pnoltes@apache.org To: commits@celix.apache.org Date: Mon, 06 Feb 2017 18:34:28 -0000 Message-Id: In-Reply-To: <42273219a2ea4445aec85a8bb0e8bab8@git.apache.org> References: <42273219a2ea4445aec85a8bb0e8bab8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [16/19] celix git commit: CELIX-389: Refactors pubsub. archived-at: Mon, 06 Feb 2017 18:34:18 -0000 http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c deleted file mode 100644 index e670899..0000000 --- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c +++ /dev/null @@ -1,732 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * pubsub_admin_impl.c - * - * \date Sep 30, 2011 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#include "pubsub_admin_impl.h" -#include - -#include -#include - -#include -#include -#include - -#ifndef ANDROID -#include -#endif - -#include -#include -#include -#include - -#include "constants.h" -#include "utils.h" -#include "hash_map.h" -#include "array_list.h" -#include "bundle_context.h" -#include "bundle.h" -#include "service_reference.h" -#include "service_registration.h" -#include "log_helper.h" -#include "log_service.h" -#include "celix_threads.h" -#include "service_factory.h" - -#include "topic_subscription.h" -#include "pubsub_publish_service_private.h" -#include "pubsub_endpoint.h" -#include "pubsub_utils.h" -#include "subscriber.h" - -#define MAX_KEY_FOLDER_PATH_LENGTH 512 - -static const char *DEFAULT_IP = "127.0.0.1"; - -static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip); -static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); -static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); -static celix_status_t pubsubAdmin_match(pubsub_admin_pt admin, pubsub_endpoint_pt psEP, double* score); - -celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin) { - celix_status_t status = CELIX_SUCCESS; - -#ifdef BUILD_WITH_ZMQ_SECURITY - if (!zsys_has_curve()){ - printf("PSA: zeromq curve unsupported\n"); - return CELIX_SERVICE_EXCEPTION; - } -#endif - - *admin = calloc(1, sizeof(**admin)); - - if (!*admin) { - status = CELIX_ENOMEM; - } - else{ - - const char *ip = NULL; - char *detectedIp = NULL; - (*admin)->bundle_context= context; - (*admin)->localPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - (*admin)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - (*admin)->pendingSubscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - (*admin)->externalPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - - celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL); - celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL); - celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, NULL); - celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL); - - if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) { - logHelper_start((*admin)->loghelper); - } - - bundleContext_getProperty(context,PSA_IP , &ip); - -#ifndef ANDROID - if (ip == NULL) { - const char *interface = NULL; - - bundleContext_getProperty(context, PSA_ITF, &interface); - if (pubsubAdmin_getIpAddress(interface, &detectedIp) != CELIX_SUCCESS) { - logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA: Could not retrieve IP adress for interface %s", interface); - } - - ip = detectedIp; - } -#endif - - if (ip != NULL) { - logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA: Using %s for service annunciation", ip); - (*admin)->ipAddress = strdup(ip); - } - else { - logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA: No IP address for service annunciation set. Using %s", DEFAULT_IP); - (*admin)->ipAddress = strdup(DEFAULT_IP); - } - - if (detectedIp != NULL) { - free(detectedIp); - } - - const char* basePortStr = NULL; - const char* maxPortStr = NULL; - char* endptrBase = NULL; - char* endptrMax = NULL; - bundleContext_getPropertyWithDefault(context, PSA_ZMQ_BASE_PORT, "PSA_ZMQ_DEFAULT_BASE_PORT", &basePortStr); - bundleContext_getPropertyWithDefault(context, PSA_ZMQ_MAX_PORT, "PSA_ZMQ_DEFAULT_MAX_PORT", &maxPortStr); - (*admin)->basePort = strtol(basePortStr, &endptrBase, 10); - (*admin)->maxPort = strtol(maxPortStr, &endptrMax, 10); - if (*endptrBase != '\0') { - (*admin)->basePort = PSA_ZMQ_DEFAULT_BASE_PORT; - } - if (*endptrMax != '\0') { - (*admin)->maxPort = PSA_ZMQ_DEFAULT_MAX_PORT; - } - - printf("PSA Using base port %u to max port %u\n", (*admin)->basePort, (*admin)->maxPort); - - // Disable Signal Handling by CZMQ - setenv("ZSYS_SIGHANDLER", "false", true); - - const char *nrZmqThreads = NULL; - bundleContext_getProperty(context, "PSA_NR_ZMQ_THREADS", &nrZmqThreads); - - if(nrZmqThreads != NULL) { - char *endPtr = NULL; - unsigned int nrThreads = strtoul(nrZmqThreads, &endPtr, 10); - if(endPtr != nrZmqThreads && nrThreads > 0 && nrThreads < 50) { - zsys_set_io_threads(nrThreads); - logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA: Using %d threads for ZMQ", nrThreads); - printf("PSA: Using %d threads for ZMQ\n", nrThreads); - } - } - -#ifdef BUILD_WITH_ZMQ_SECURITY - // Setup authenticator - zactor_t* auth = zactor_new (zauth, NULL); - zstr_sendx(auth, "VERBOSE", NULL); - - // Load all public keys of subscribers into the application - // This step is done for authenticating subscribers - char curve_folder_path[MAX_KEY_FOLDER_PATH_LENGTH]; - char* keys_bundle_dir = pubsub_getKeysBundleDir(context); - snprintf(curve_folder_path, MAX_KEY_FOLDER_PATH_LENGTH, "%s/META-INF/keys/subscriber/public", keys_bundle_dir); - zstr_sendx (auth, "CURVE", curve_folder_path, NULL); - free(keys_bundle_dir); - - (*admin)->zmq_auth = auth; -#endif - - } - - return status; -} - - -celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin) -{ - celix_status_t status = CELIX_SUCCESS; - - free(admin->ipAddress); - - celixThreadMutex_lock(&admin->pendingSubscriptionsLock); - hash_map_iterator_pt iter = hashMapIterator_create(admin->pendingSubscriptions); - while(hashMapIterator_hasNext(iter)){ - hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); - free((char*)hashMapEntry_getKey(entry)); - arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry)); - } - hashMapIterator_destroy(iter); - hashMap_destroy(admin->pendingSubscriptions,false,false); - celixThreadMutex_unlock(&admin->pendingSubscriptionsLock); - - celixThreadMutex_lock(&admin->subscriptionsLock); - hashMap_destroy(admin->subscriptions,false,false); - celixThreadMutex_unlock(&admin->subscriptionsLock); - - celixThreadMutex_lock(&admin->localPublicationsLock); - hashMap_destroy(admin->localPublications,true,false); - celixThreadMutex_unlock(&admin->localPublicationsLock); - - celixThreadMutex_lock(&admin->externalPublicationsLock); - iter = hashMapIterator_create(admin->externalPublications); - while(hashMapIterator_hasNext(iter)){ - hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); - free((char*)hashMapEntry_getKey(entry)); - arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry)); - } - hashMapIterator_destroy(iter); - hashMap_destroy(admin->externalPublications,false,false); - celixThreadMutex_unlock(&admin->externalPublicationsLock); - - celixThreadMutex_destroy(&admin->pendingSubscriptionsLock); - celixThreadMutex_destroy(&admin->subscriptionsLock); - celixThreadMutex_destroy(&admin->localPublicationsLock); - celixThreadMutex_destroy(&admin->externalPublicationsLock); - - logHelper_stop(admin->loghelper); - - logHelper_destroy(&admin->loghelper); - -#ifdef BUILD_WITH_ZMQ_SECURITY - if (admin->zmq_auth != NULL){ - zactor_destroy(&(admin->zmq_auth)); - } -#endif - - free(admin); - - return status; -} - -static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&admin->subscriptionsLock); - - topic_subscription_pt any_sub = hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC); - - if(any_sub==NULL){ - - int i; - - status += pubsub_topicSubscriptionCreate(admin->bundle_context, subEP, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, &any_sub); - - if (status == CELIX_SUCCESS){ - - /* Connect all internal publishers */ - celixThreadMutex_lock(&admin->localPublicationsLock); - hash_map_iterator_pt lp_iter =hashMapIterator_create(admin->localPublications); - while(hashMapIterator_hasNext(lp_iter)){ - service_factory_pt factory = (service_factory_pt)hashMapIterator_nextValue(lp_iter); - topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle; - array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs); - - if(topic_publishers!=NULL){ - for(i=0;iendpoint !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint); - } - } - } - } - hashMapIterator_destroy(lp_iter); - celixThreadMutex_unlock(&admin->localPublicationsLock); - - /* Connect also all external publishers */ - celixThreadMutex_lock(&admin->externalPublicationsLock); - hash_map_iterator_pt extp_iter =hashMapIterator_create(admin->externalPublications); - while(hashMapIterator_hasNext(extp_iter)){ - array_list_pt ext_pub_list = (array_list_pt)hashMapIterator_nextValue(extp_iter); - if(ext_pub_list!=NULL){ - for(i=0;iendpoint !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint); - } - } - } - } - hashMapIterator_destroy(extp_iter); - celixThreadMutex_unlock(&admin->externalPublicationsLock); - - - pubsub_topicSubscriptionAddSubscriber(any_sub,subEP); - - status += pubsub_topicSubscriptionStart(any_sub); - - } - - if (status == CELIX_SUCCESS){ - hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub); - } - - } - - celixThreadMutex_unlock(&admin->subscriptionsLock); - - return status; -} - -celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ - celix_status_t status = CELIX_SUCCESS; - - printf("PSA: Received subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope, subEP->topic); - - if(strcmp(subEP->topic,PUBSUB_ANY_SUB_TOPIC)==0){ - return pubsubAdmin_addAnySubscription(admin,subEP); - } - - /* Check if we already know some publisher about this topic, otherwise let's put the subscription in the pending hashmap */ - celixThreadMutex_lock(&admin->localPublicationsLock); - celixThreadMutex_lock(&admin->externalPublicationsLock); - - char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic); - - service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic); - array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic); - - if(factory==NULL && ext_pub_list==NULL){ //No (local or external) publishers yet for this topic - celixThreadMutex_lock(&admin->pendingSubscriptionsLock); - pubsubAdmin_addSubscriptionToPendingList(admin,subEP); - celixThreadMutex_unlock(&admin->pendingSubscriptionsLock); - } else { - int i; - topic_subscription_pt subscription = hashMap_get(admin->subscriptions, scope_topic); - - if(subscription == NULL) { - status += pubsub_topicSubscriptionCreate(admin->bundle_context,subEP,subEP->scope, subEP->topic,&subscription); - - if (status==CELIX_SUCCESS){ - - /* Try to connect internal publishers */ - if(factory!=NULL){ - topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle; - array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs); - - if(topic_publishers!=NULL){ - for(i=0;iendpoint !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint); - } - } - } - - } - - /* Look also for external publishers */ - if(ext_pub_list!=NULL){ - for(i=0;iendpoint !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint); - } - } - } - - pubsub_topicSubscriptionAddSubscriber(subscription,subEP); - - status += pubsub_topicSubscriptionStart(subscription); - - } - - if(status==CELIX_SUCCESS){ - celixThreadMutex_lock(&admin->subscriptionsLock); - hashMap_put(admin->subscriptions,strdup(scope_topic),subscription); - celixThreadMutex_unlock(&admin->subscriptionsLock); - } - } - - if (status == CELIX_SUCCESS){ - pubsub_topicIncreaseNrSubscribers(subscription); - } - } - - free(scope_topic); - celixThreadMutex_unlock(&admin->externalPublicationsLock); - celixThreadMutex_unlock(&admin->localPublicationsLock); - - return status; - -} - -celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ - celix_status_t status = CELIX_SUCCESS; - - printf("PSA: Removing subscription [FWUUID=%s bundleID=%ld topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->topic); - - celixThreadMutex_lock(&admin->subscriptionsLock); - - topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,subEP->topic); - - if(sub!=NULL){ - pubsub_topicDecreaseNrSubscribers(sub); - if(pubsub_topicGetNrSubscribers(sub) == 0) { - status = pubsub_topicSubscriptionRemoveSubscriber(sub,subEP); - } - } - else{ - status = CELIX_ILLEGAL_STATE; - } - - celixThreadMutex_unlock(&admin->subscriptionsLock); - - return status; - -} - -celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP) { - celix_status_t status = CELIX_SUCCESS; - - printf("PSA: Received publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n", pubEP->frameworkUUID, pubEP->serviceID, pubEP->scope, pubEP->topic); - - const char* fwUUID = NULL; - - char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic); - bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID); - if (fwUUID == NULL) { - printf("PSA: Cannot retrieve fwUUID.\n"); - return CELIX_INVALID_BUNDLE_CONTEXT; - } - - if ((strcmp(pubEP->frameworkUUID, fwUUID) == 0) && (pubEP->endpoint == NULL)) { - - celixThreadMutex_lock(&admin->localPublicationsLock); - - service_factory_pt factory = (service_factory_pt) hashMap_get(admin->localPublications, scope_topic); - - if (factory == NULL) { - topic_publication_pt pub = NULL; - status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, admin->ipAddress, admin->basePort, admin->maxPort, &pub); - if (status == CELIX_SUCCESS) { - status = pubsub_topicPublicationStart(admin->bundle_context, pub, &factory); - if (status == CELIX_SUCCESS && factory != NULL) { - hashMap_put(admin->localPublications, strdup(scope_topic), factory); - } - } else { - printf("PSA: Cannot create a topicPublication for scope=%s, topic=%s (bundle %ld).\n", pubEP->scope, pubEP->topic, pubEP->serviceID); - } - } else { - //just add the new EP to the list - topic_publication_pt pub = (topic_publication_pt) factory->handle; - pubsub_topicPublicationAddPublisherEP(pub, pubEP); - } - - celixThreadMutex_unlock(&admin->localPublicationsLock); - } else { - - celixThreadMutex_lock(&admin->externalPublicationsLock); - array_list_pt ext_pub_list = (array_list_pt) hashMap_get(admin->externalPublications, scope_topic); - if (ext_pub_list == NULL) { - arrayList_create(&ext_pub_list); - hashMap_put(admin->externalPublications, strdup(scope_topic), ext_pub_list); - } - - arrayList_add(ext_pub_list, pubEP); - - celixThreadMutex_unlock(&admin->externalPublicationsLock); - } - - /* Re-evaluate the pending subscriptions */ - celixThreadMutex_lock(&admin->pendingSubscriptionsLock); - - hash_map_entry_pt pendingSub = hashMap_getEntry(admin->pendingSubscriptions, scope_topic); - if (pendingSub != NULL) { //There were pending subscription for the just published topic. Let's connect them. - char* topic = (char*) hashMapEntry_getKey(pendingSub); - array_list_pt pendingSubList = (array_list_pt) hashMapEntry_getValue(pendingSub); - int i; - for (i = 0; i < arrayList_size(pendingSubList); i++) { - pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) arrayList_get(pendingSubList, i); - pubsubAdmin_addSubscription(admin, subEP); - } - hashMap_remove(admin->pendingSubscriptions, scope_topic); - arrayList_clear(pendingSubList); - arrayList_destroy(pendingSubList); - free(topic); - } - - celixThreadMutex_unlock(&admin->pendingSubscriptionsLock); - - /* Connect the new publisher to the subscription for his topic, if there is any */ - celixThreadMutex_lock(&admin->subscriptionsLock); - - topic_subscription_pt sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, scope_topic); - if (sub != NULL && pubEP->endpoint != NULL) { - pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, pubEP->endpoint); - } - - /* And check also for ANY subscription */ - topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC); - if (any_sub != NULL && pubEP->endpoint != NULL) { - pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, pubEP->endpoint); - } - free(scope_topic); - - celixThreadMutex_unlock(&admin->subscriptionsLock); - - return status; - -} - -celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){ - celix_status_t status = CELIX_SUCCESS; - int count = 0; - - printf("PSA: Removing publication [FWUUID=%s bundleID=%ld topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->topic); - - const char* fwUUID = NULL; - - bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); - if(fwUUID==NULL){ - printf("PSA: Cannot retrieve fwUUID.\n"); - return CELIX_INVALID_BUNDLE_CONTEXT; - } - char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic); - if(strcmp(pubEP->frameworkUUID,fwUUID)==0){ - - celixThreadMutex_lock(&admin->localPublicationsLock); - - service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic); - if(factory!=NULL){ - topic_publication_pt pub = (topic_publication_pt)factory->handle; - pubsub_topicPublicationRemovePublisherEP(pub,pubEP); - } - else{ - status = CELIX_ILLEGAL_STATE; - } - - celixThreadMutex_unlock(&admin->localPublicationsLock); - } - else{ - - celixThreadMutex_lock(&admin->externalPublicationsLock); - array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic); - if(ext_pub_list!=NULL){ - int i; - bool found = false; - for(i=0;!found && iendpoint,p->endpoint) == 0) { - count++; - } - } - - if(ext_pub_list_size == 0){ - hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic); - char* topic = (char*)hashMapEntry_getKey(entry); - array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry); - hashMap_remove(admin->externalPublications,topic); - arrayList_destroy(list); - free(topic); - } - - } - - celixThreadMutex_unlock(&admin->externalPublicationsLock); - } - - /* Check if this publisher was connected to one of our subscribers*/ - celixThreadMutex_lock(&admin->subscriptionsLock); - - topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic); - if(sub!=NULL && pubEP->endpoint!=NULL && count == 0){ - pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint); - } - - /* And check also for ANY subscription */ - topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC); - if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){ - pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,pubEP->endpoint); - } - free(scope_topic); - celixThreadMutex_unlock(&admin->subscriptionsLock); - - return status; - -} - -celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin, char *scope, char* topic){ - celix_status_t status = CELIX_SUCCESS; - - printf("PSA: Closing all publications\n"); - - celixThreadMutex_lock(&admin->localPublicationsLock); - char *scope_topic = createScopeTopicKey(scope, topic); - hash_map_entry_pt pubsvc_entry = (hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic); - if(pubsvc_entry!=NULL){ - char* topic = (char*)hashMapEntry_getKey(pubsvc_entry); - service_factory_pt factory= (service_factory_pt)hashMapEntry_getValue(pubsvc_entry); - topic_publication_pt pub = (topic_publication_pt)factory->handle; - status += pubsub_topicPublicationStop(pub); - status += pubsub_topicPublicationDestroy(pub); - - hashMap_remove(admin->localPublications,scope_topic); - free(topic); - free(factory); - } - free(scope_topic); - celixThreadMutex_unlock(&admin->localPublicationsLock); - - return status; - -} - -celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope,char* topic){ - celix_status_t status = CELIX_SUCCESS; - - printf("PSA: Closing all subscriptions\n"); - - celixThreadMutex_lock(&admin->subscriptionsLock); - char *scope_topic = createScopeTopicKey(scope, topic); - hash_map_entry_pt sub_entry = (hash_map_entry_pt)hashMap_getEntry(admin->subscriptions,scope_topic); - if(sub_entry!=NULL){ - char* topic = (char*)hashMapEntry_getKey(sub_entry); - - topic_subscription_pt ts = (topic_subscription_pt)hashMapEntry_getValue(sub_entry); - status += pubsub_topicSubscriptionStop(ts); - status += pubsub_topicSubscriptionDestroy(ts); - - hashMap_remove(admin->subscriptions,scope_topic); - free(topic); - - } - free(scope_topic); - celixThreadMutex_unlock(&admin->subscriptionsLock); - - return status; - -} - -celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score){ - celix_status_t status = CELIX_SUCCESS; - status = pubsubAdmin_match(admin, pubEP, score); - return status; -} - -celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score){ - celix_status_t status = CELIX_SUCCESS; - status = pubsubAdmin_match(admin, subEP, score); - return status; -} - -static celix_status_t pubsubAdmin_match(pubsub_admin_pt admin, pubsub_endpoint_pt psEP, double* score){ - celix_status_t status = CELIX_SUCCESS; - - char topic_psa_prop[1024]; - snprintf(topic_psa_prop, 1024, "%s.psa", psEP->topic); - - const char* psa_to_use = NULL; - bundleContext_getPropertyWithDefault(admin->bundle_context, topic_psa_prop, PSA_DEFAULT, &psa_to_use); - - *score = 0; - if (strcmp(psa_to_use, "zmq") == 0){ - *score += 100; - }else{ - *score += 1; - } - - return status; -} - -#ifndef ANDROID -static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** ip) { - celix_status_t status = CELIX_BUNDLE_EXCEPTION; - - struct ifaddrs *ifaddr, *ifa; - char host[NI_MAXHOST]; - - if (getifaddrs(&ifaddr) != -1) - { - for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = ifa->ifa_next) - { - if (ifa->ifa_addr == NULL) - continue; - - if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) { - if (interface == NULL) { - *ip = strdup(host); - status = CELIX_SUCCESS; - } - else if (strcmp(ifa->ifa_name, interface) == 0) { - *ip = strdup(host); - status = CELIX_SUCCESS; - } - } - } - - freeifaddrs(ifaddr); - } - - return status; -} -#endif - -static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ - celix_status_t status = CELIX_SUCCESS; - char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic); - array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic); - if(pendingListPerTopic==NULL){ - arrayList_create(&pendingListPerTopic); - hashMap_put(admin->pendingSubscriptions,strdup(scope_topic),pendingListPerTopic); - } - arrayList_add(pendingListPerTopic,subEP); - free(scope_topic); - return status; -} http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c deleted file mode 100644 index 1a036db..0000000 --- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c +++ /dev/null @@ -1,636 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * topic_publication.c - * - * \date Sep 24, 2015 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#include "pubsub_publish_service_private.h" -#include -/* The following undefs prevent the collision between: - * - sys/syslog.h (which is included within czmq) - * - celix/dfi/dfi_log_util.h - */ -#undef LOG_DEBUG -#undef LOG_WARNING -#undef LOG_INFO -#undef LOG_WARNING - -#include -#include -#include - -#include "array_list.h" -#include "celixbool.h" -#include "service_registration.h" -#include "utils.h" -#include "service_factory.h" -#include "version.h" - -#include "pubsub_common.h" -#include "dyn_msg_utils.h" -#include "pubsub_utils.h" -#include "publisher.h" - -#include "pubsub_serializer.h" - -#ifdef BUILD_WITH_ZMQ_SECURITY - #include "zmq_crypto.h" - - #define MAX_CERT_PATH_LENGTH 512 -#endif - -#define EP_ADDRESS_LEN 32 -#define ZMQ_BIND_MAX_RETRY 5 - -#define FIRST_SEND_DELAY 2 - -struct topic_publication { - zsock_t* zmq_socket; - zcert_t * zmq_cert; - char* endpoint; - service_registration_pt svcFactoryReg; - array_list_pt pub_ep_list; //List - hash_map_pt boundServices; // - celix_thread_mutex_t tp_lock; -}; - -typedef struct publish_bundle_bound_service { - topic_publication_pt parent; - pubsub_publisher_pt service; - bundle_pt bundle; - char *topic; - hash_map_pt msgTypes; - unsigned short getCount; - celix_thread_mutex_t mp_lock; - bool mp_send_in_progress; - array_list_pt mp_parts; -}* publish_bundle_bound_service_pt; - -typedef struct pubsub_msg{ - pubsub_msg_header_pt header; - char* payload; - int payloadSize; -}* pubsub_msg_pt; - -static unsigned int rand_range(unsigned int min, unsigned int max); - -static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service); -static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service); - -static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle); -static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc); - -static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, void *msg); -static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, void *msg, int flags); -static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId); - -static void delay_first_send_for_late_joiners(void); - -celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP,char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){ - celix_status_t status = CELIX_SUCCESS; - -#ifdef BUILD_WITH_ZMQ_SECURITY - char* secure_topics = NULL; - bundleContext_getProperty(bundle_context, "SECURE_TOPICS", (const char **) &secure_topics); - - if (secure_topics){ - array_list_pt secure_topics_list = pubsub_getTopicsFromString(secure_topics); - - int i; - int secure_topics_size = arrayList_size(secure_topics_list); - for (i = 0; i < secure_topics_size; i++){ - char* top = arrayList_get(secure_topics_list, i); - if (strcmp(pubEP->topic, top) == 0){ - printf("TP: Secure topic: '%s'\n", top); - pubEP->is_secure = true; - } - free(top); - top = NULL; - } - - arrayList_destroy(secure_topics_list); - } - - zcert_t* pub_cert = NULL; - if (pubEP->is_secure){ - char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context); - if (keys_bundle_dir == NULL){ - return CELIX_SERVICE_EXCEPTION; - } - - const char* keys_file_path = NULL; - const char* keys_file_name = NULL; - bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, &keys_file_path); - bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, &keys_file_name); - - char cert_path[MAX_CERT_PATH_LENGTH]; - - //certificate path ".cache/bundle{id}/version0.0/./META-INF/keys/publisher/private/pub_{topic}.key" - snprintf(cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/publisher/private/pub_%s.key.enc", keys_bundle_dir, pubEP->topic); - free(keys_bundle_dir); - printf("TP: Loading key '%s'\n", cert_path); - - pub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, cert_path); - if (pub_cert == NULL){ - printf("TP: Cannot load key '%s'\n", cert_path); - printf("TP: Topic '%s' NOT SECURED !\n", pubEP->topic); - pubEP->is_secure = false; - } - } -#endif - - zsock_t* socket = zsock_new (ZMQ_PUB); - if(socket==NULL){ - #ifdef BUILD_WITH_ZMQ_SECURITY - if (pubEP->is_secure){ - zcert_destroy(&pub_cert); - } - #endif - - perror("Error for zmq_socket"); - return CELIX_SERVICE_EXCEPTION; - } -#ifdef BUILD_WITH_ZMQ_SECURITY - if (pubEP->is_secure){ - zcert_apply (pub_cert, socket); // apply certificate to socket - zsock_set_curve_server (socket, true); // setup the publisher's socket to use the curve functions - } -#endif - - int rv = -1, retry=0; - char* ep = malloc(EP_ADDRESS_LEN); - char bindAddress[EP_ADDRESS_LEN]; - - while(rv==-1 && retrypub_ep_list)); - pub->boundServices = hashMap_create(NULL,NULL,NULL,NULL); - celixThreadMutex_create(&(pub->tp_lock),NULL); - - pub->endpoint = ep; - pub->zmq_socket = socket; - -#ifdef BUILD_WITH_ZMQ_SECURITY - if (pubEP->is_secure){ - pub->zmq_cert = pub_cert; - } -#endif - - pubsub_topicPublicationAddPublisherEP(pub,pubEP); - - *out = pub; - - return status; -} - -celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){ - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&(pub->tp_lock)); - - free(pub->endpoint); - arrayList_destroy(pub->pub_ep_list); - - hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices); - while(hashMapIterator_hasNext(iter)){ - publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter); - pubsub_destroyPublishBundleBoundService(bound); - } - hashMapIterator_destroy(iter); - hashMap_destroy(pub->boundServices,false,false); - - pub->svcFactoryReg = NULL; - zsock_destroy(&(pub->zmq_socket)); -#ifdef BUILD_WITH_ZMQ_SECURITY - zcert_destroy(&(pub->zmq_cert)); -#endif - - celixThreadMutex_unlock(&(pub->tp_lock)); - - celixThreadMutex_destroy(&(pub->tp_lock)); - - free(pub); - - return status; -} - -celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory){ - celix_status_t status = CELIX_SUCCESS; - - /* Let's register the new service */ - //celixThreadMutex_lock(&(pub->tp_lock)); - - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0); - - if(pubEP!=NULL){ - service_factory_pt factory = calloc(1, sizeof(*factory)); - factory->handle = pub; - factory->getService = pubsub_topicPublicationGetService; - factory->ungetService = pubsub_topicPublicationUngetService; - - properties_pt props = properties_create(); - properties_set(props,PUBSUB_PUBLISHER_TOPIC,pubEP->topic); - properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope); - properties_set(props,"service.version", PUBSUB_PUBLISHER_SERVICE_VERSION); - - status = bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg)); - - if(status != CELIX_SUCCESS){ - properties_destroy(props); - printf("PSA: Cannot register ServiceFactory for topic %s (bundle %ld).\n",pubEP->topic,pubEP->serviceID); - } - else{ - *svcFactory = factory; - } - } - else{ - printf("PSA: Cannot find pubsub_endpoint after adding it...Should never happen!\n"); - status = CELIX_SERVICE_EXCEPTION; - } - - //celixThreadMutex_unlock(&(pub->tp_lock)); - - return status; -} - -celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){ - celix_status_t status = CELIX_SUCCESS; - - //celixThreadMutex_lock(&(pub->tp_lock)); - - status = serviceRegistration_unregister(pub->svcFactoryReg); - - //celixThreadMutex_unlock(&(pub->tp_lock)); - - return status; -} - -celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){ - - celixThreadMutex_lock(&(pub->tp_lock)); - ep->endpoint = strdup(pub->endpoint); - arrayList_add(pub->pub_ep_list,ep); - celixThreadMutex_unlock(&(pub->tp_lock)); - - return CELIX_SUCCESS; -} - -celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){ - - celixThreadMutex_lock(&(pub->tp_lock)); - for (int i = 0; i < arrayList_size(pub->pub_ep_list); i++) { - pubsub_endpoint_pt e = arrayList_get(pub->pub_ep_list, i); - if(pubsubEndpoint_equals(ep, e)) { - arrayList_removeElement(pub->pub_ep_list,ep); - break; - } - } - celixThreadMutex_unlock(&(pub->tp_lock)); - - return CELIX_SUCCESS; -} - -array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){ - return pub->pub_ep_list; -} - - -static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) { - celix_status_t status = CELIX_SUCCESS; - - topic_publication_pt publish = (topic_publication_pt)handle; - - celixThreadMutex_lock(&(publish->tp_lock)); - - publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle); - if(bound==NULL){ - bound = pubsub_createPublishBundleBoundService(publish,bundle); - if(bound!=NULL){ - hashMap_put(publish->boundServices,bundle,bound); - } - } - else{ - bound->getCount++; - } - - *service = bound->service; - - celixThreadMutex_unlock(&(publish->tp_lock)); - - return status; -} - -static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) { - - topic_publication_pt publish = (topic_publication_pt)handle; - - celixThreadMutex_lock(&(publish->tp_lock)); - - publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle); - if(bound!=NULL){ - - bound->getCount--; - if(bound->getCount==0){ - pubsub_destroyPublishBundleBoundService(bound); - hashMap_remove(publish->boundServices,bundle); - } - - } - else{ - long bundleId = -1; - bundle_getBundleId(bundle,&bundleId); - printf("TP: Unexpected ungetService call for bundle %ld.\n", bundleId); - } - - /* service should be never used for unget, so let's set the pointer to NULL */ - *service = NULL; - - celixThreadMutex_unlock(&(publish->tp_lock)); - - return CELIX_SUCCESS; -} - -static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_pt msg, bool last){ - - bool ret = true; - - zframe_t* headerMsg = zframe_new(msg->header, sizeof(struct pubsub_msg_header)); - if (headerMsg == NULL) ret=false; - zframe_t* payloadMsg = zframe_new(msg->payload, msg->payloadSize); - if (payloadMsg == NULL) ret=false; - - delay_first_send_for_late_joiners(); - - if( zframe_send(&headerMsg,zmq_socket, ZFRAME_MORE) == -1) ret=false; - - if(!last){ - if( zframe_send(&payloadMsg,zmq_socket, ZFRAME_MORE) == -1) ret=false; - } - else{ - if( zframe_send(&payloadMsg,zmq_socket, 0) == -1) ret=false; - } - - if (!ret){ - zframe_destroy(&headerMsg); - zframe_destroy(&payloadMsg); - } - - free(msg->header); - free(msg->payload); - free(msg); - - return ret; - -} - -static bool send_pubsub_mp_msg(zsock_t* zmq_socket, array_list_pt mp_msg_parts){ - - bool ret = true; - - unsigned int i = 0; - unsigned int mp_num = arrayList_size(mp_msg_parts); - for(;imp_lock)); - if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & PUBSUB_PUBLISHER_LAST_MSG) && bound->mp_send_in_progress){ //means a real mp_msg - printf("TP: Multipart send already in progress. Cannot process a new one.\n"); - celixThreadMutex_unlock(&(bound->mp_lock)); - return -3; - } - - pubsub_message_type *msgType = hashMap_get(bound->msgTypes, &msgTypeId); - - int major=0, minor=0; - - if (msgType != NULL) { - - version_pt msgVersion = pubsubSerializer_getVersion(msgType); - - pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header)); - - strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1); - - msg_hdr->type = msgTypeId; - if (msgVersion != NULL){ - version_getMajor(msgVersion, &major); - version_getMinor(msgVersion, &minor); - msg_hdr->major = major; - msg_hdr->minor = minor; - } - - void* serializedOutput = NULL; - int serializedOutputLen = 0; - pubsubSerializer_serialize(msgType, msg, &serializedOutput, &serializedOutputLen); - - pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg)); - msg->header = msg_hdr; - msg->payload = (char *) serializedOutput; - msg->payloadSize = serializedOutputLen; - - bool snd = true; - - switch(flags){ - case PUBSUB_PUBLISHER_FIRST_MSG: - bound->mp_send_in_progress = true; - arrayList_add(bound->mp_parts,msg); - break; - case PUBSUB_PUBLISHER_PART_MSG: - if(!bound->mp_send_in_progress){ - printf("TP: ERROR: received msg part without the first part.\n"); - status = -4; - } - else{ - arrayList_add(bound->mp_parts,msg); - } - break; - case PUBSUB_PUBLISHER_LAST_MSG: - if(!bound->mp_send_in_progress){ - printf("TP: ERROR: received end msg without the first part.\n"); - status = -4; - } - else{ - arrayList_add(bound->mp_parts,msg); - celixThreadMutex_lock(&(bound->parent->tp_lock)); - snd = send_pubsub_mp_msg(bound->parent->zmq_socket,bound->mp_parts); - bound->mp_send_in_progress = false; - celixThreadMutex_unlock(&(bound->parent->tp_lock)); - } - break; - case PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG: //Normal send case - celixThreadMutex_lock(&(bound->parent->tp_lock)); - snd = send_pubsub_msg(bound->parent->zmq_socket,msg,true); - celixThreadMutex_unlock(&(bound->parent->tp_lock)); - break; - default: - printf("TP: ERROR: Invalid MP flags combination\n"); - status = -4; - break; - } - - if(!snd){ - printf("TP: Failed to send %s message %u.\n",flags == (PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG) ? "single" : "multipart", msgTypeId); - } - - } else { - printf("TP: Message %u not supported.",msgTypeId); - status=-1; - } - - celixThreadMutex_unlock(&(bound->mp_lock)); - - return status; - -} - -static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){ - *msgTypeId = pubsubSerializer_hashCode(msgType); - return 0; -} - -static unsigned int rand_range(unsigned int min, unsigned int max){ - - double scaled = (double)(((double)rand())/((double)RAND_MAX)); - return (max-min+1)*scaled + min; - -} - -static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){ - - publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound)); - - if (bound != NULL) { - bound->service = calloc(1, sizeof(*bound->service)); - } - - if (bound != NULL && bound->service != NULL) { - - bound->parent = tp; - bound->bundle = bundle; - bound->getCount = 1; - bound->mp_send_in_progress = false; - celixThreadMutex_create(&bound->mp_lock,NULL); - bound->msgTypes = hashMap_create(uintHash, NULL, uintEquals, NULL); // - arrayList_create(&bound->mp_parts); - - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0); - bound->topic=strdup(pubEP->topic); - - bound->service->handle = bound; - bound->service->localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID; - bound->service->send = pubsub_topicPublicationSend; - bound->service->sendMultipart = pubsub_topicPublicationSendMultipart; - - pubsubSerializer_fillMsgTypesMap(bound->msgTypes,bound->bundle); - - } - else - { - if (bound != NULL) { - free(bound->service); - } - free(bound); - return NULL; - } - - return bound; -} - -static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){ - - celixThreadMutex_lock(&boundSvc->mp_lock); - - if(boundSvc->service != NULL){ - free(boundSvc->service); - } - - if(boundSvc->msgTypes != NULL){ - pubsubSerializer_emptyMsgTypesMap(boundSvc->msgTypes); - hashMap_destroy(boundSvc->msgTypes,false,false); - } - - if(boundSvc->mp_parts!=NULL){ - arrayList_destroy(boundSvc->mp_parts); - } - - if(boundSvc->topic!=NULL){ - free(boundSvc->topic); - } - - celixThreadMutex_unlock(&boundSvc->mp_lock); - celixThreadMutex_destroy(&boundSvc->mp_lock); - - free(boundSvc); - -} - -static void delay_first_send_for_late_joiners(){ - - static bool firstSend = true; - - if(firstSend){ - printf("TP: Delaying first send for late joiners...\n"); - sleep(FIRST_SEND_DELAY); - firstSend = false; - } -} http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c deleted file mode 100644 index cb9aff5..0000000 --- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c +++ /dev/null @@ -1,777 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * topic_subscription.c - * - * \date Oct 2, 2015 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#include "topic_subscription.h" -#include -/* The following undefs prevent the collision between: - * - sys/syslog.h (which is included within czmq) - * - celix/dfi/dfi_log_util.h - */ -#undef LOG_DEBUG -#undef LOG_WARNING -#undef LOG_INFO -#undef LOG_WARNING - -#include -#include -#include - -#include "utils.h" -#include "celix_errno.h" -#include "constants.h" -#include "version.h" - -#include "subscriber.h" -#include "publisher.h" -#include "dyn_msg_utils.h" -#include "pubsub_utils.h" - -#include "pubsub_serializer.h" - -#ifdef BUILD_WITH_ZMQ_SECURITY - #include "zmq_crypto.h" - - #define MAX_CERT_PATH_LENGTH 512 -#endif - -#define POLL_TIMEOUT 250 -#define ZMQ_POLL_TIMEOUT_MS_ENV "ZMQ_POLL_TIMEOUT_MS" - -struct topic_subscription{ - - zsock_t* zmq_socket; - zcert_t * zmq_cert; - zcert_t * zmq_pub_cert; - pthread_mutex_t socket_lock; - service_tracker_pt tracker; - array_list_pt sub_ep_list; - celix_thread_t recv_thread; - bool running; - celix_thread_mutex_t ts_lock; - bundle_context_pt context; - - hash_map_pt servicesMap; // key = service, value = msg types map - array_list_pt pendingConnections; - array_list_pt pendingDisconnections; - - celix_thread_mutex_t pendingConnections_lock; - celix_thread_mutex_t pendingDisconnections_lock; - unsigned int nrSubscribers; -}; - -typedef struct complete_zmq_msg{ - zframe_t* header; - zframe_t* payload; -}* complete_zmq_msg_pt; - -typedef struct mp_handle{ - hash_map_pt svc_msg_db; - hash_map_pt rcv_msg_map; -}* mp_handle_pt; - -typedef struct msg_map_entry{ - bool retain; - void* msgInst; -}* msg_map_entry_pt; - -static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service); -static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service); -static void* zmq_recv_thread_func(void* arg); -static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr); -static void sigusr1_sighandler(int signo); -static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId); -static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain, void **part); -static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt rcv_msg_list); -static void destroy_mp_handle(mp_handle_pt mp_handle); -static void connectPendingPublishers(topic_subscription_pt sub); -static void disconnectPendingPublishers(topic_subscription_pt sub); - -celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, char* scope, char* topic,topic_subscription_pt* out){ - celix_status_t status = CELIX_SUCCESS; - -#ifdef BUILD_WITH_ZMQ_SECURITY - if(strcmp(topic,PUBSUB_ANY_SUB_TOPIC) != 0){ - char* secure_topics = NULL; - bundleContext_getProperty(bundle_context, "SECURE_TOPICS", (const char **) &secure_topics); - - if (secure_topics){ - array_list_pt secure_topics_list = pubsub_getTopicsFromString(secure_topics); - - int i; - int secure_topics_size = arrayList_size(secure_topics_list); - for (i = 0; i < secure_topics_size; i++){ - char* top = arrayList_get(secure_topics_list, i); - if (strcmp(topic, top) == 0){ - printf("TS: Secure topic: '%s'\n", top); - subEP->is_secure = true; - } - free(top); - top = NULL; - } - - arrayList_destroy(secure_topics_list); - } - } - - zcert_t* sub_cert = NULL; - zcert_t* pub_cert = NULL; - const char* pub_key = NULL; - if (subEP->is_secure){ - char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context); - if (keys_bundle_dir == NULL){ - return CELIX_SERVICE_EXCEPTION; - } - - const char* keys_file_path = NULL; - const char* keys_file_name = NULL; - bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, &keys_file_path); - bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, &keys_file_name); - - char sub_cert_path[MAX_CERT_PATH_LENGTH]; - char pub_cert_path[MAX_CERT_PATH_LENGTH]; - - //certificate path ".cache/bundle{id}/version0.0/./META-INF/keys/subscriber/private/sub_{topic}.key.enc" - snprintf(sub_cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/subscriber/private/sub_%s.key.enc", keys_bundle_dir, topic); - snprintf(pub_cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/publisher/public/pub_%s.pub", keys_bundle_dir, topic); - free(keys_bundle_dir); - - printf("TS: Loading subscriber key '%s'\n", sub_cert_path); - printf("TS: Loading publisher key '%s'\n", pub_cert_path); - - sub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, sub_cert_path); - if (sub_cert == NULL){ - printf("TS: Cannot load key '%s'\n", sub_cert_path); - printf("TS: Topic '%s' NOT SECURED !\n", topic); - subEP->is_secure = false; - } - - pub_cert = zcert_load(pub_cert_path); - if (sub_cert != NULL && pub_cert == NULL){ - zcert_destroy(&sub_cert); - printf("TS: Cannot load key '%s'\n", pub_cert_path); - printf("TS: Topic '%s' NOT SECURED !\n", topic); - subEP->is_secure = false; - } - - pub_key = zcert_public_txt(pub_cert); - } -#endif - - zsock_t* zmq_s = zsock_new (ZMQ_SUB); - if(zmq_s==NULL){ - #ifdef BUILD_WITH_ZMQ_SECURITY - if (subEP->is_secure){ - zcert_destroy(&sub_cert); - zcert_destroy(&pub_cert); - } - #endif - - return CELIX_SERVICE_EXCEPTION; - } - - #ifdef BUILD_WITH_ZMQ_SECURITY - if (subEP->is_secure){ - zcert_apply (sub_cert, zmq_s); - zsock_set_curve_serverkey (zmq_s, pub_key); //apply key of publisher to socket of subscriber - } - #endif - - if(strcmp(topic,PUBSUB_ANY_SUB_TOPIC)==0){ - zsock_set_subscribe (zmq_s, ""); - } - else{ - zsock_set_subscribe (zmq_s, topic); - } - - topic_subscription_pt ts = (topic_subscription_pt) calloc(1,sizeof(*ts)); - ts->context = bundle_context; - ts->zmq_socket = zmq_s; - ts->running = false; - ts->nrSubscribers = 0; - - #ifdef BUILD_WITH_ZMQ_SECURITY - if (subEP->is_secure){ - ts->zmq_cert = sub_cert; - ts->zmq_pub_cert = pub_cert; - } - #endif - - celixThreadMutex_create(&ts->socket_lock, NULL); - celixThreadMutex_create(&ts->ts_lock,NULL); - arrayList_create(&ts->sub_ep_list); - ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL); - arrayList_create(&ts->pendingConnections); - arrayList_create(&ts->pendingDisconnections); - celixThreadMutex_create(&ts->pendingConnections_lock, NULL); - celixThreadMutex_create(&ts->pendingDisconnections_lock, NULL); - - char filter[128]; - memset(filter,0,128); - if(strncmp(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT,scope,strlen(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT)) == 0) { - // default scope, means that subscriber has not defined a scope property - snprintf(filter, 128, "(&(%s=%s)(%s=%s))", - (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME, - PUBSUB_SUBSCRIBER_TOPIC,topic); - - } else { - snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))", - (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME, - PUBSUB_SUBSCRIBER_TOPIC,topic, - PUBSUB_SUBSCRIBER_SCOPE,scope); - } - service_tracker_customizer_pt customizer = NULL; - status += serviceTrackerCustomizer_create(ts,NULL,topicsub_subscriberTracked,NULL,topicsub_subscriberUntracked,&customizer); - status += serviceTracker_createWithFilter(bundle_context, filter, customizer, &ts->tracker); - - struct sigaction actions; - memset(&actions, 0, sizeof(actions)); - sigemptyset(&actions.sa_mask); - actions.sa_flags = 0; - actions.sa_handler = sigusr1_sighandler; - - sigaction(SIGUSR1,&actions,NULL); - - *out=ts; - - return status; -} - -celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){ - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&ts->ts_lock); - - ts->running = false; - serviceTracker_destroy(ts->tracker); - arrayList_clear(ts->sub_ep_list); - arrayList_destroy(ts->sub_ep_list); - hashMap_destroy(ts->servicesMap,false,false); - - celixThreadMutex_lock(&ts->pendingConnections_lock); - arrayList_destroy(ts->pendingConnections); - celixThreadMutex_unlock(&ts->pendingConnections_lock); - celixThreadMutex_destroy(&ts->pendingConnections_lock); - - celixThreadMutex_lock(&ts->pendingDisconnections_lock); - arrayList_destroy(ts->pendingDisconnections); - celixThreadMutex_unlock(&ts->pendingDisconnections_lock); - celixThreadMutex_destroy(&ts->pendingDisconnections_lock); - - celixThreadMutex_lock(&ts->socket_lock); - zsock_destroy(&(ts->zmq_socket)); - #ifdef BUILD_WITH_ZMQ_SECURITY - zcert_destroy(&(ts->zmq_cert)); - zcert_destroy(&(ts->zmq_pub_cert)); - #endif - celixThreadMutex_unlock(&ts->socket_lock); - celixThreadMutex_destroy(&ts->socket_lock); - - celixThreadMutex_unlock(&ts->ts_lock); - - - free(ts); - - return status; -} - -celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts){ - celix_status_t status = CELIX_SUCCESS; - - //celixThreadMutex_lock(&ts->ts_lock); - - status = serviceTracker_open(ts->tracker); - - ts->running = true; - - if(status==CELIX_SUCCESS){ - status=celixThread_create(&ts->recv_thread,NULL,zmq_recv_thread_func,ts); - } - - //celixThreadMutex_unlock(&ts->ts_lock); - - return status; -} - -celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){ - celix_status_t status = CELIX_SUCCESS; - - //celixThreadMutex_lock(&ts->ts_lock); - - ts->running = false; - - pthread_kill(ts->recv_thread.thread,SIGUSR1); - - celixThread_join(ts->recv_thread,NULL); - - status = serviceTracker_close(ts->tracker); - - //celixThreadMutex_unlock(&ts->ts_lock); - - return status; -} - -celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL){ - celix_status_t status = CELIX_SUCCESS; - celixThreadMutex_lock(&ts->socket_lock); - if(!zsock_is(ts->zmq_socket) || zsock_connect(ts->zmq_socket, "%s", pubURL) != 0){ - status = CELIX_SERVICE_EXCEPTION; - } - celixThreadMutex_unlock(&ts->socket_lock); - - return status; -} - -celix_status_t pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) { - celix_status_t status = CELIX_SUCCESS; - char *url = strdup(pubURL); - celixThreadMutex_lock(&ts->pendingConnections_lock); - arrayList_add(ts->pendingConnections, url); - celixThreadMutex_unlock(&ts->pendingConnections_lock); - return status; -} - -celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) { - celix_status_t status = CELIX_SUCCESS; - char *url = strdup(pubURL); - celixThreadMutex_lock(&ts->pendingDisconnections_lock); - arrayList_add(ts->pendingDisconnections, url); - celixThreadMutex_unlock(&ts->pendingDisconnections_lock); - return status; -} - -celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL){ - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&ts->socket_lock); - if(!zsock_is(ts->zmq_socket) || zsock_disconnect(ts->zmq_socket, "%s", pubURL) != 0){ - status = CELIX_SERVICE_EXCEPTION; - } - celixThreadMutex_unlock(&ts->socket_lock); - - return status; -} - -celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP){ - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&ts->ts_lock); - - arrayList_add(ts->sub_ep_list,subEP); - - celixThreadMutex_unlock(&ts->ts_lock); - - return status; - -} - -celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt ts) { - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&ts->ts_lock); - - ts->nrSubscribers++; - - celixThreadMutex_unlock(&ts->ts_lock); - - return status; -} - -celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP){ - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&ts->ts_lock); - - arrayList_removeElement(ts->sub_ep_list,subEP); - - celixThreadMutex_unlock(&ts->ts_lock); - - return status; - -} - -celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt ts) { - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&ts->ts_lock); - - ts->nrSubscribers--; - - celixThreadMutex_unlock(&ts->ts_lock); - - return status; -} - -unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) { - return ts->nrSubscribers; -} - -static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service){ - celix_status_t status = CELIX_SUCCESS; - topic_subscription_pt ts = handle; - - celixThreadMutex_lock(&ts->ts_lock); - if (!hashMap_containsKey(ts->servicesMap, service)) { - hash_map_pt msgTypes = hashMap_create(uintHash, NULL, uintEquals, NULL); //key = msgId, value = pubsub_message_type - - bundle_pt bundle = NULL; - serviceReference_getBundle(reference, &bundle); - pubsubSerializer_fillMsgTypesMap(msgTypes,bundle); - - if(hashMap_size(msgTypes)==0){ //If the msgTypes hashMap is not filled, the service is an unsupported subscriber - hashMap_destroy(msgTypes,false,false); - printf("TS: Unsupported subscriber!\n"); - } - else{ - hashMap_put(ts->servicesMap, service, msgTypes); - } - - } - celixThreadMutex_unlock(&ts->ts_lock); - printf("TS: New subscriber registered.\n"); - return status; - -} - -static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service){ - celix_status_t status = CELIX_SUCCESS; - topic_subscription_pt ts = handle; - - celixThreadMutex_lock(&ts->ts_lock); - if (hashMap_containsKey(ts->servicesMap, service)) { - hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service); - if(msgTypes!=NULL){ - pubsubSerializer_emptyMsgTypesMap(msgTypes); - hashMap_destroy(msgTypes,false,false); - } - } - celixThreadMutex_unlock(&ts->ts_lock); - - printf("TS: Subscriber unregistered.\n"); - return status; -} - - -static void process_msg(topic_subscription_pt sub,array_list_pt msg_list){ - - pubsub_msg_header_pt first_msg_hdr = (pubsub_msg_header_pt)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->header); - - hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap); - while (hashMapIterator_hasNext(iter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); - pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry); - hash_map_pt msgTypes = hashMapEntry_getValue(entry); - - pubsub_message_type *msgType = hashMap_get(msgTypes,&(first_msg_hdr->type)); - if (msgType == NULL) { - printf("TS: Primary message %d not supported. NOT sending any part of the whole message.\n",first_msg_hdr->type); - } - else{ - void *msgInst = NULL; - char *name = pubsubSerializer_getName(msgType); - version_pt msgVersion = pubsubSerializer_getVersion(msgType); - - bool validVersion = checkVersion(msgVersion,first_msg_hdr); - - if(validVersion){ - - celix_status_t status = pubsubSerializer_deserialize(msgType, (const void *) zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), &msgInst); - - if (status == CELIX_SUCCESS) { - bool release = true; - - mp_handle_pt mp_handle = create_mp_handle(msgTypes,msg_list); - pubsub_multipart_callbacks_t mp_callbacks; - mp_callbacks.handle = mp_handle; - mp_callbacks.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForMsgType; - mp_callbacks.getMultipart = pubsub_getMultipart; - subsvc->receive(subsvc->handle, name, first_msg_hdr->type, msgInst, &mp_callbacks, &release); - - if(release){ - pubsubSerializer_freeMsg(msgType, msgInst); - } - if(mp_handle!=NULL){ - destroy_mp_handle(mp_handle); - } - } - else{ - printf("TS: Cannot deserialize msgType %s.\n",name); - } - - } - else{ - int major=0,minor=0; - version_getMajor(msgVersion,&major); - version_getMinor(msgVersion,&minor); - printf("TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",name,major,minor,first_msg_hdr->major,first_msg_hdr->minor); - } - - } - } - hashMapIterator_destroy(iter); - - int i = 0; - for(;iheader)); - zframe_destroy(&(c_msg->payload)); - free(c_msg); - } - - arrayList_destroy(msg_list); - -} - -static void* zmq_recv_thread_func(void * arg) { - topic_subscription_pt sub = (topic_subscription_pt) arg; - - while (sub->running) { - - celixThreadMutex_lock(&sub->socket_lock); - - zframe_t* headerMsg = zframe_recv(sub->zmq_socket); - if (headerMsg == NULL) { - if (errno == EINTR) { - //It means we got a signal and we have to exit... - printf("TS: header_recv thread for topic got a signal and will exit.\n"); - } else { - perror("TS: header_recv thread"); - } - } else { - - pubsub_msg_header_pt hdr = (pubsub_msg_header_pt) zframe_data(headerMsg); - - if (zframe_more(headerMsg)) { - - zframe_t* payloadMsg = zframe_recv(sub->zmq_socket); - if (payloadMsg == NULL) { - if (errno == EINTR) { - //It means we got a signal and we have to exit... - printf("TS: payload_recv thread for topic got a signal and will exit.\n"); - } else { - perror("TS: payload_recv"); - } - zframe_destroy(&headerMsg); - } else { - - celixThreadMutex_lock(&sub->ts_lock); - - //Let's fetch all the messages from the socket - array_list_pt msg_list = NULL; - arrayList_create(&msg_list); - complete_zmq_msg_pt firstMsg = calloc(1, sizeof(struct complete_zmq_msg)); - firstMsg->header = headerMsg; - firstMsg->payload = payloadMsg; - arrayList_add(msg_list, firstMsg); - - bool more = zframe_more(payloadMsg); - while (more) { - - zframe_t* h_msg = zframe_recv(sub->zmq_socket); - if (h_msg == NULL) { - if (errno == EINTR) { - //It means we got a signal and we have to exit... - printf("TS: h_recv thread for topic got a signal and will exit.\n"); - } else { - perror("TS: h_recv"); - } - break; - } - - zframe_t* p_msg = zframe_recv(sub->zmq_socket); - if (p_msg == NULL) { - if (errno == EINTR) { - //It means we got a signal and we have to exit... - printf("TS: p_recv thread for topic got a signal and will exit.\n"); - } else { - perror("TS: p_recv"); - } - zframe_destroy(&h_msg); - break; - } - - complete_zmq_msg_pt c_msg = calloc(1, sizeof(struct complete_zmq_msg)); - c_msg->header = h_msg; - c_msg->payload = p_msg; - arrayList_add(msg_list, c_msg); - - if (!zframe_more(p_msg)) { - more = false; - } - } - - process_msg(sub, msg_list); - - celixThreadMutex_unlock(&sub->ts_lock); - - } - - } //zframe_more(headerMsg) - else { - free(headerMsg); - printf("TS: received message %u for topic %s without payload!\n", hdr->type, hdr->topic); - } - - } // headerMsg != NULL - celixThreadMutex_unlock(&sub->socket_lock); - connectPendingPublishers(sub); - disconnectPendingPublishers(sub); - } // while - - return NULL; -} - -static void connectPendingPublishers(topic_subscription_pt sub) { - celixThreadMutex_lock(&sub->pendingConnections_lock); - while(!arrayList_isEmpty(sub->pendingConnections)) { - char * pubEP = arrayList_remove(sub->pendingConnections, 0); - pubsub_topicSubscriptionConnectPublisher(sub, pubEP); - free(pubEP); - } - celixThreadMutex_unlock(&sub->pendingConnections_lock); -} - -static void disconnectPendingPublishers(topic_subscription_pt sub) { - celixThreadMutex_lock(&sub->pendingDisconnections_lock); - while(!arrayList_isEmpty(sub->pendingDisconnections)) { - char * pubEP = arrayList_remove(sub->pendingDisconnections, 0); - pubsub_topicSubscriptionDisconnectPublisher(sub, pubEP); - free(pubEP); - } - celixThreadMutex_unlock(&sub->pendingDisconnections_lock); -} - -static void sigusr1_sighandler(int signo){ - printf("TS: Topic subscription being shut down...\n"); - return; -} - -static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){ - bool check=false; - int major=0,minor=0; - - if(msgVersion!=NULL){ - version_getMajor(msgVersion,&major); - version_getMinor(msgVersion,&minor); - if(hdr->major==((unsigned char)major)){ /* Different major means incompatible */ - check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */ - } - } - - return check; -} - -static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId){ - *msgTypeId = pubsubSerializer_hashCode(msgType); - return 0; -} - -static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain, void **part){ - - if(handle==NULL){ - *part = NULL; - return -1; - } - - mp_handle_pt mp_handle = (mp_handle_pt)handle; - msg_map_entry_pt entry = hashMap_get(mp_handle->rcv_msg_map,&msgTypeId); - if(entry!=NULL){ - entry->retain = retain; - *part = entry->msgInst; - } - else{ - printf("TP: getMultipart cannot find msg '%u'\n",msgTypeId); - *part=NULL; - return -2; - } - - return 0; - -} - -static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt rcv_msg_list){ - - if(arrayList_size(rcv_msg_list)==1){ //Means it's not a multipart message - return NULL; - } - - mp_handle_pt mp_handle = calloc(1,sizeof(struct mp_handle)); - mp_handle->svc_msg_db = svc_msg_db; - mp_handle->rcv_msg_map = hashMap_create(uintHash, NULL, uintEquals, NULL); - - int i=1; //We skip the first message, it will be handle differently - for(;iheader); - - pubsub_message_type *msgType = hashMap_get(svc_msg_db,&(header->type)); - if (msgType != NULL) { - void *msgInst = NULL; - version_pt msgVersion = pubsubSerializer_getVersion(msgType); - - bool validVersion = checkVersion(msgVersion,header); - - if(validVersion){ - celix_status_t status = pubsubSerializer_deserialize(msgType, (const void *) zframe_data(c_msg->payload), &msgInst); - - if(status == CELIX_SUCCESS){ - unsigned int* msgId = calloc(1,sizeof(unsigned int)); - *msgId = header->type; - msg_map_entry_pt entry = calloc(1,sizeof(struct msg_map_entry)); - entry->msgInst = msgInst; - hashMap_put(mp_handle->rcv_msg_map,msgId,entry); - } - } - } - - } - - return mp_handle; - -} - -static void destroy_mp_handle(mp_handle_pt mp_handle){ - - hash_map_iterator_pt iter = hashMapIterator_create(mp_handle->rcv_msg_map); - while(hashMapIterator_hasNext(iter)){ - hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); - unsigned int* msgId = (unsigned int*)hashMapEntry_getKey(entry); - msg_map_entry_pt msgEntry = hashMapEntry_getValue(entry); - pubsub_message_type* msgType = hashMap_get(mp_handle->svc_msg_db,msgId); - if(msgType!=NULL){ - if(!msgEntry->retain){ - free(msgEntry->msgInst); - } - } - else{ - printf("TS: ERROR: Cannot find pubsub_message_type for msg %u, so cannot destroy it!\n",*msgId); - } - } - hashMapIterator_destroy(iter); - - hashMap_destroy(mp_handle->rcv_msg_map,true,true); - free(mp_handle); -} http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/zmq_crypto.c ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/zmq_crypto.c b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/zmq_crypto.c deleted file mode 100644 index fe444bd..0000000 --- a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/zmq_crypto.c +++ /dev/null @@ -1,281 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * zmq_crypto.c - * - * \date Dec 2, 2016 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#include "zmq_crypto.h" - -#include -#include -#include -#include - -#include - -#define MAX_FILE_PATH_LENGTH 512 -#define ZMQ_KEY_LENGTH 40 -#define AES_KEY_LENGTH 32 -#define AES_IV_LENGTH 16 - -#define KEY_TO_GET "aes_key" -#define IV_TO_GET "aes_iv" - -static char* read_file_content(char* filePath, char* fileName); -static void parse_key_lines(char *keysBuffer, char **key, char **iv); -static void parse_key_line(char *line, char **key, char **iv); -static void extract_keys_from_buffer(unsigned char *input, int inputlen, char **publicKey, char **secretKey); - -/** - * Return a valid zcert_t from an encoded file - * Caller is responsible for freeing by calling zcert_destroy(zcert** cert); - */ -zcert_t* get_zcert_from_encoded_file(char* keysFilePath, char* keysFileName, char* file_path) -{ - - if (keysFilePath == NULL){ - keysFilePath = DEFAULT_KEYS_FILE_PATH; - } - - if (keysFileName == NULL){ - keysFileName = DEFAULT_KEYS_FILE_NAME; - } - - char* keys_data = read_file_content(keysFilePath, keysFileName); - if (keys_data == NULL){ - return NULL; - } - - char *key = NULL; - char *iv = NULL; - parse_key_lines(keys_data, &key, &iv); - free(keys_data); - - if (key == NULL || iv == NULL){ - free(key); - free(iv); - - printf("CRYPTO: Loading AES key and/or AES iv failed!\n"); - return NULL; - } - - //At this point, we know an aes key and iv are stored and loaded - - // generate sha256 hashes - unsigned char key_digest[EVP_MAX_MD_SIZE]; - unsigned char iv_digest[EVP_MAX_MD_SIZE]; - generate_sha256_hash((char*) key, key_digest); - generate_sha256_hash((char*) iv, iv_digest); - - zchunk_t* encoded_secret = zchunk_slurp (file_path, 0); - if (encoded_secret == NULL){ - free(key); - free(iv); - - return NULL; - } - - int encoded_secret_size = (int) zchunk_size (encoded_secret); - char* encoded_secret_data = zchunk_strdup(encoded_secret); - zchunk_destroy (&encoded_secret); - - // Decryption of data - int decryptedtext_len; - unsigned char decryptedtext[encoded_secret_size]; - decryptedtext_len = decrypt((unsigned char *) encoded_secret_data, encoded_secret_size, key_digest, iv_digest, decryptedtext); - decryptedtext[decryptedtext_len] = '\0'; - - EVP_cleanup(); - - free(encoded_secret_data); - free(key); - free(iv); - - // The public and private keys are retrieved - char* public_text = NULL; - char* secret_text = NULL; - - extract_keys_from_buffer(decryptedtext, decryptedtext_len, &public_text, &secret_text); - - byte public_key [32] = { 0 }; - byte secret_key [32] = { 0 }; - - zmq_z85_decode (public_key, public_text); - zmq_z85_decode (secret_key, secret_text); - - zcert_t* cert_loaded = zcert_new_from(public_key, secret_key); - - free(public_text); - free(secret_text); - - return cert_loaded; -} - -int generate_sha256_hash(char* text, unsigned char* digest) -{ - unsigned int digest_len; - - EVP_MD_CTX * mdctx = EVP_MD_CTX_new(); - EVP_DigestInit_ex(mdctx, EVP_sha256(), NULL); - EVP_DigestUpdate(mdctx, text, strlen(text)); - EVP_DigestFinal_ex(mdctx, digest, &digest_len); - EVP_MD_CTX_free(mdctx); - - return digest_len; -} - -int decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext) -{ - int len; - int plaintext_len; - - EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new(); - - EVP_DecryptInit_ex(ctx, EVP_aes_256_cbc(), NULL, key, iv); - EVP_DecryptUpdate(ctx, plaintext, &len, ciphertext, ciphertext_len); - plaintext_len = len; - EVP_DecryptFinal_ex(ctx, plaintext + len, &len); - plaintext_len += len; - - EVP_CIPHER_CTX_free(ctx); - - return plaintext_len; -} - -/** - * Caller is responsible for freeing the returned value - */ -static char* read_file_content(char* filePath, char* fileName){ - - char fileNameWithPath[MAX_FILE_PATH_LENGTH]; - snprintf(fileNameWithPath, MAX_FILE_PATH_LENGTH, "%s/%s", filePath, fileName); - int rc = 0; - - if (!zsys_file_exists(fileNameWithPath)){ - printf("CRYPTO: Keys file '%s' doesn't exist!\n", fileNameWithPath); - return NULL; - } - - zfile_t* keys_file = zfile_new (filePath, fileName); - rc = zfile_input (keys_file); - if (rc != 0){ - zfile_destroy(&keys_file); - printf("CRYPTO: Keys file '%s' not readable!\n", fileNameWithPath); - return NULL; - } - - ssize_t keys_file_size = zsys_file_size (fileNameWithPath); - zchunk_t* keys_chunk = zfile_read (keys_file, keys_file_size, 0); - if (keys_chunk == NULL){ - zfile_close(keys_file); - zfile_destroy(&keys_file); - printf("CRYPTO: Can't read file '%s'!\n", fileNameWithPath); - return NULL; - } - - char* keys_data = zchunk_strdup(keys_chunk); - zchunk_destroy(&keys_chunk); - zfile_close(keys_file); - zfile_destroy (&keys_file); - - return keys_data; -} - -static void parse_key_lines(char *keysBuffer, char **key, char **iv){ - char *line = NULL, *saveLinePointer = NULL; - - bool firstTime = true; - do { - if (firstTime){ - line = strtok_r(keysBuffer, "\n", &saveLinePointer); - firstTime = false; - }else { - line = strtok_r(NULL, "\n", &saveLinePointer); - } - - if (line == NULL){ - break; - } - - parse_key_line(line, key, iv); - - } while((*key == NULL || *iv == NULL) && line != NULL); - -} - -static void parse_key_line(char *line, char **key, char **iv){ - char *detectedKey = NULL, *detectedValue= NULL; - - char* sep_at = strchr(line, ':'); - if (sep_at == NULL){ - return; - } - - *sep_at = '\0'; // overwrite first separator, creating two strings. - detectedKey = line; - detectedValue = sep_at + 1; - - if (detectedKey == NULL || detectedValue == NULL){ - return; - } - if (detectedKey[0] == '\0' || detectedValue[0] == '\0'){ - return; - } - - if (*key == NULL && strcmp(detectedKey, KEY_TO_GET) == 0){ - *key = strndup(detectedValue, AES_KEY_LENGTH); - } else if (*iv == NULL && strcmp(detectedKey, IV_TO_GET) == 0){ - *iv = strndup(detectedValue, AES_IV_LENGTH); - } -} - -static void extract_keys_from_buffer(unsigned char *input, int inputlen, char **publicKey, char **secretKey) { - // Load decrypted text buffer - zchunk_t* secret_decrypted = zchunk_new(input, inputlen); - if (secret_decrypted == NULL){ - printf("CRYPTO: Failed to create zchunk\n"); - return; - } - - zconfig_t* secret_config = zconfig_chunk_load (secret_decrypted); - zchunk_destroy (&secret_decrypted); - if (secret_config == NULL){ - printf("CRYPTO: Failed to create zconfig\n"); - return; - } - - // Extract public and secret key from text buffer - char* public_text = zconfig_get (secret_config, "/curve/public-key", NULL); - char* secret_text = zconfig_get (secret_config, "/curve/secret-key", NULL); - - if (public_text == NULL || secret_text == NULL){ - zconfig_destroy(&secret_config); - printf("CRYPTO: Loading public / secret key from text-buffer failed!\n"); - return; - } - - *publicKey = strndup(public_text, ZMQ_KEY_LENGTH + 1); - *secretKey = strndup(secret_text, ZMQ_KEY_LENGTH + 1); - - zconfig_destroy(&secret_config); -} http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_common/public/include/dyn_msg_utils.h ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_common/public/include/dyn_msg_utils.h b/celix-pubsub/pubsub/pubsub_common/public/include/dyn_msg_utils.h deleted file mode 100644 index 71085ab..0000000 --- a/celix-pubsub/pubsub/pubsub_common/public/include/dyn_msg_utils.h +++ /dev/null @@ -1,39 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * dyn_msg_utils.h - * - * \date Nov 11, 2015 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#ifndef DYN_MSG_UTILS_H_ -#define DYN_MSG_UTILS_H_ - -#include "bundle.h" -#include "hash_map.h" - -unsigned int uintHash(const void * uintNum); -int uintEquals(const void * uintNum, const void * toCompare); - -void fillMsgTypesMap(hash_map_pt msgTypesMap,bundle_pt bundle); -void emptyMsgTypesMap(hash_map_pt msgTypesMap); - -#endif /* DYN_MSG_UTILS_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_common/public/include/publisher_endpoint_announce.h ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_common/public/include/publisher_endpoint_announce.h b/celix-pubsub/pubsub/pubsub_common/public/include/publisher_endpoint_announce.h deleted file mode 100644 index bd39fc0..0000000 --- a/celix-pubsub/pubsub/pubsub_common/public/include/publisher_endpoint_announce.h +++ /dev/null @@ -1,36 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ - -#ifndef PUBLISHER_ENDPOINT_ANNOUNCE_H_ -#define PUBLISHER_ENDPOINT_ANNOUNCE_H_ - -#include "pubsub_endpoint.h" - -struct publisher_endpoint_announce { - void *handle; - celix_status_t (*announcePublisher)(void *handle, pubsub_endpoint_pt pubEP); - celix_status_t (*removePublisher)(void *handle, pubsub_endpoint_pt pubEP); - celix_status_t (*interestedInTopic)(void* handle, const char *scope, const char *topic); - celix_status_t (*uninterestedInTopic)(void* handle, const char *scope, const char *topic); -}; - -typedef struct publisher_endpoint_announce *publisher_endpoint_announce_pt; - - -#endif /* PUBLISHER_ENDPOINT_ANNOUNCE_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_admin.h ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_admin.h b/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_admin.h deleted file mode 100644 index fc1cfbb..0000000 --- a/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_admin.h +++ /dev/null @@ -1,61 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * pubsub_admin.h - * - * \date Sep 30, 2011 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#ifndef PUBSUB_ADMIN_H_ -#define PUBSUB_ADMIN_H_ - -#include "service_reference.h" - -#include "pubsub_common.h" -#include "pubsub_endpoint.h" - -#define PSA_IP "PSA_IP" -#define PSA_ITF "PSA_INTERFACE" -#define PSA_MULTICAST_IP_PREFIX "PSA_MC_PREFIX" - -#define PSA_DEFAULT "zmq" - -typedef struct pubsub_admin *pubsub_admin_pt; - -struct pubsub_admin_service { - pubsub_admin_pt admin; - - celix_status_t (*addSubscription)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); - celix_status_t (*removeSubscription)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); - - celix_status_t (*addPublication)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); - celix_status_t (*removePublication)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); - - celix_status_t (*closeAllPublications)(pubsub_admin_pt admin,char* scope, char* topic); - celix_status_t (*closeAllSubscriptions)(pubsub_admin_pt admin,char* scope, char* topic); - - celix_status_t (*matchPublisher)(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score); - celix_status_t (*matchSubscriber)(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score); -}; - -typedef struct pubsub_admin_service *pubsub_admin_service_pt; - -#endif /* PUBSUB_ADMIN_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_common.h ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_common.h b/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_common.h deleted file mode 100644 index d9c6f1d..0000000 --- a/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_common.h +++ /dev/null @@ -1,51 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * pubsub_common.h - * - * \date Sep 17, 2015 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#ifndef PUBSUB_COMMON_H_ -#define PUBSUB_COMMON_H_ - -#define PUBSUB_ADMIN_SERVICE "pubsub_admin" -#define PUBSUB_DISCOVERY_SERVICE "pubsub_discovery" -#define PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE "pubsub_tm_announce_publisher" - -#define PUBSUB_ANY_SUB_TOPIC "any" - -#define PUBSUB_BUNDLE_ID "bundle.id" - -#define MAX_SCOPE_LEN 1024 -#define MAX_TOPIC_LEN 1024 - -struct pubsub_msg_header{ - char topic[MAX_TOPIC_LEN]; - unsigned int type; - unsigned char major; - unsigned char minor; -}; - -typedef struct pubsub_msg_header* pubsub_msg_header_pt; - - -#endif /* PUBSUB_COMMON_H_ */