Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EBD46200C22 for ; Mon, 6 Feb 2017 15:23:05 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id EA6B7160B53; Mon, 6 Feb 2017 14:23:05 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6D113160B6B for ; Mon, 6 Feb 2017 15:23:03 +0100 (CET) Received: (qmail 39354 invoked by uid 500); 6 Feb 2017 14:23:02 -0000 Mailing-List: contact commits-help@celix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@celix.apache.org Delivered-To: mailing list commits@celix.apache.org Received: (qmail 39195 invoked by uid 99); 6 Feb 2017 14:23:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Feb 2017 14:23:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 11FA4DFF1F; Mon, 6 Feb 2017 14:23:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pnoltes@apache.org To: commits@celix.apache.org Date: Mon, 06 Feb 2017 14:23:02 -0000 Message-Id: <18063a1064f448ae9c22544cd261895e@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/6] celix git commit: CELIX-389: Adds Celix Publish Subscribe donation. archived-at: Mon, 06 Feb 2017 14:23:06 -0000 http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c new file mode 100644 index 0000000..47bf094 --- /dev/null +++ b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c @@ -0,0 +1,699 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * pubsub_admin_impl.c + * + * \date Sep 30, 2011 + * \author Apache Celix Project Team + * \copyright Apache License, Version 2.0 + */ + +#include "pubsub_admin_impl.h" +#include + +#include +#include + +#include +#include +#include + +#ifndef ANDROID +#include +#endif + +#include +#include +#include +#include + +#include "constants.h" +#include "utils.h" +#include "hash_map.h" +#include "array_list.h" +#include "bundle_context.h" +#include "bundle.h" +#include "service_reference.h" +#include "service_registration.h" +#include "log_helper.h" +#include "log_service.h" +#include "celix_threads.h" +#include "service_factory.h" + +#include "topic_subscription.h" +#include "pubsub_publish_service_private.h" +#include "pubsub_endpoint.h" +#include "pubsub_utils.h" +#include "subscriber.h" + +#define MAX_KEY_FOLDER_PATH_LENGTH 512 + +static const char *DEFAULT_IP = "127.0.0.1"; + +static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip); +static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); +static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); + +celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin) { + celix_status_t status = CELIX_SUCCESS; + +#ifdef USE_ZMQ_SECURITY + if (!zsys_has_curve()){ + printf("PSA: zeromq curve unsupported\n"); + return CELIX_SERVICE_EXCEPTION; + } +#endif + + *admin = calloc(1, sizeof(**admin)); + + if (!*admin) { + status = CELIX_ENOMEM; + } + else{ + + const char *ip = NULL; + char *detectedIp = NULL; + (*admin)->bundle_context= context; + (*admin)->localPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + (*admin)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + (*admin)->pendingSubscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + (*admin)->externalPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + + celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL); + celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL); + celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, NULL); + celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL); + + if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) { + logHelper_start((*admin)->loghelper); + } + + bundleContext_getProperty(context,PSA_IP , &ip); + +#ifndef ANDROID + if (ip == NULL) { + const char *interface = NULL; + + bundleContext_getProperty(context, PSA_ITF, &interface); + if (pubsubAdmin_getIpAdress(interface, &detectedIp) != CELIX_SUCCESS) { + logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA: Could not retrieve IP adress for interface %s", interface); + } + + ip = detectedIp; + } +#endif + + if (ip != NULL) { + logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA: Using %s for service annunciation", ip); + (*admin)->ipAddress = strdup(ip); + } + else { + logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA: No IP address for service annunciation set. Using %s", DEFAULT_IP); + (*admin)->ipAddress = strdup(DEFAULT_IP); + } + + if (detectedIp != NULL) { + free(detectedIp); + } + + const char* basePortStr = NULL; + const char* maxPortStr = NULL; + char* endptrBase = NULL; + char* endptrMax = NULL; + bundleContext_getPropertyWithDefault(context, PSA_ZMQ_BASE_PORT, "PSA_ZMQ_DEFAULT_BASE_PORT", &basePortStr); + bundleContext_getPropertyWithDefault(context, PSA_ZMQ_MAX_PORT, "PSA_ZMQ_DEFAULT_MAX_PORT", &maxPortStr); + (*admin)->basePort = strtol(basePortStr, &endptrBase, 10); + (*admin)->maxPort = strtol(maxPortStr, &endptrMax, 10); + if (*endptrBase != '\0') { + (*admin)->basePort = PSA_ZMQ_DEFAULT_BASE_PORT; + } + if (*endptrMax != '\0') { + (*admin)->maxPort = PSA_ZMQ_DEFAULT_MAX_PORT; + } + + printf("PSA Using base port %u to max port %u\n", (*admin)->basePort, (*admin)->maxPort); + + // Disable Signal Handling by CZMQ + setenv("ZSYS_SIGHANDLER", "false", true); + + const char *nrZmqThreads = NULL; + bundleContext_getProperty(context, "PSA_NR_ZMQ_THREADS", &nrZmqThreads); + + if(nrZmqThreads != NULL) { + char *endPtr = NULL; + unsigned int nrThreads = strtoul(nrZmqThreads, &endPtr, 10); + if(endPtr != nrZmqThreads && nrThreads > 0 && nrThreads < 50) { + zsys_set_io_threads(nrThreads); + logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA: Using %d threads for ZMQ", nrThreads); + printf("PSA: Using %d threads for ZMQ\n", nrThreads); + } + } + +#ifdef USE_ZMQ_SECURITY + // Setup authenticator + zactor_t* auth = zactor_new (zauth, NULL); + zstr_sendx(auth, "VERBOSE", NULL); + + // Load all public keys of subscribers into the application + // This step is done for authenticating subscribers + char curve_folder_path[MAX_KEY_FOLDER_PATH_LENGTH]; + char* keys_bundle_dir = pubsub_getKeysBundleDir(context); + snprintf(curve_folder_path, MAX_KEY_FOLDER_PATH_LENGTH, "%s/META-INF/keys/subscriber/public", keys_bundle_dir); + zstr_sendx (auth, "CURVE", curve_folder_path, NULL); + free(keys_bundle_dir); + + (*admin)->zmq_auth = auth; +#endif + + } + + return status; +} + + +celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin) +{ + celix_status_t status = CELIX_SUCCESS; + + free(admin->ipAddress); + + celixThreadMutex_lock(&admin->pendingSubscriptionsLock); + hash_map_iterator_pt iter = hashMapIterator_create(admin->pendingSubscriptions); + while(hashMapIterator_hasNext(iter)){ + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + free((char*)hashMapEntry_getKey(entry)); + arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry)); + } + hashMapIterator_destroy(iter); + hashMap_destroy(admin->pendingSubscriptions,false,false); + celixThreadMutex_unlock(&admin->pendingSubscriptionsLock); + + celixThreadMutex_lock(&admin->subscriptionsLock); + hashMap_destroy(admin->subscriptions,false,false); + celixThreadMutex_unlock(&admin->subscriptionsLock); + + celixThreadMutex_lock(&admin->localPublicationsLock); + hashMap_destroy(admin->localPublications,true,false); + celixThreadMutex_unlock(&admin->localPublicationsLock); + + celixThreadMutex_lock(&admin->externalPublicationsLock); + iter = hashMapIterator_create(admin->externalPublications); + while(hashMapIterator_hasNext(iter)){ + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + free((char*)hashMapEntry_getKey(entry)); + arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry)); + } + hashMapIterator_destroy(iter); + hashMap_destroy(admin->externalPublications,false,false); + celixThreadMutex_unlock(&admin->externalPublicationsLock); + + celixThreadMutex_destroy(&admin->pendingSubscriptionsLock); + celixThreadMutex_destroy(&admin->subscriptionsLock); + celixThreadMutex_destroy(&admin->localPublicationsLock); + celixThreadMutex_destroy(&admin->externalPublicationsLock); + + logHelper_stop(admin->loghelper); + + logHelper_destroy(&admin->loghelper); + +#ifdef USE_ZMQ_SECURITY + if (admin->zmq_auth != NULL){ + zactor_destroy(&(admin->zmq_auth)); + } +#endif + + free(admin); + + return status; +} + +static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&admin->subscriptionsLock); + + topic_subscription_pt any_sub = hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC); + + if(any_sub==NULL){ + + int i; + + status += pubsub_topicSubscriptionCreate(admin->bundle_context, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, &any_sub); + + if (status == CELIX_SUCCESS){ + + /* Connect all internal publishers */ + celixThreadMutex_lock(&admin->localPublicationsLock); + hash_map_iterator_pt lp_iter =hashMapIterator_create(admin->localPublications); + while(hashMapIterator_hasNext(lp_iter)){ + service_factory_pt factory = (service_factory_pt)hashMapIterator_nextValue(lp_iter); + topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle; + array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs); + + if(topic_publishers!=NULL){ + for(i=0;iendpoint !=NULL){ + status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint); + } + } + } + } + hashMapIterator_destroy(lp_iter); + celixThreadMutex_unlock(&admin->localPublicationsLock); + + /* Connect also all external publishers */ + celixThreadMutex_lock(&admin->externalPublicationsLock); + hash_map_iterator_pt extp_iter =hashMapIterator_create(admin->externalPublications); + while(hashMapIterator_hasNext(extp_iter)){ + array_list_pt ext_pub_list = (array_list_pt)hashMapIterator_nextValue(extp_iter); + if(ext_pub_list!=NULL){ + for(i=0;iendpoint !=NULL){ + status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint); + } + } + } + } + hashMapIterator_destroy(extp_iter); + celixThreadMutex_unlock(&admin->externalPublicationsLock); + + + pubsub_topicSubscriptionAddSubscriber(any_sub,subEP); + + status += pubsub_topicSubscriptionStart(any_sub); + + } + + if (status == CELIX_SUCCESS){ + hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub); + } + + } + + celixThreadMutex_unlock(&admin->subscriptionsLock); + + return status; +} + +celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ + celix_status_t status = CELIX_SUCCESS; + + printf("PSA: Received subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope, subEP->topic); + + if(strcmp(subEP->topic,PUBSUB_ANY_SUB_TOPIC)==0){ + return pubsubAdmin_addAnySubscription(admin,subEP); + } + + /* Check if we already know some publisher about this topic, otherwise let's put the subscription in the pending hashmap */ + celixThreadMutex_lock(&admin->localPublicationsLock); + celixThreadMutex_lock(&admin->externalPublicationsLock); + + char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic); + + service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic); + array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic); + + if(factory==NULL && ext_pub_list==NULL){ //No (local or external) publishers yet for this topic + celixThreadMutex_lock(&admin->pendingSubscriptionsLock); + pubsubAdmin_addSubscriptionToPendingList(admin,subEP); + celixThreadMutex_unlock(&admin->pendingSubscriptionsLock); + } else { + int i; + topic_subscription_pt subscription = hashMap_get(admin->subscriptions, scope_topic); + + if(subscription == NULL) { + status += pubsub_topicSubscriptionCreate(admin->bundle_context,subEP->scope, subEP->topic,&subscription); + + if (status==CELIX_SUCCESS){ + + /* Try to connect internal publishers */ + if(factory!=NULL){ + topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle; + array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs); + + if(topic_publishers!=NULL){ + for(i=0;iendpoint !=NULL){ + status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint); + } + } + } + + } + + /* Look also for external publishers */ + if(ext_pub_list!=NULL){ + for(i=0;iendpoint !=NULL){ + status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint); + } + } + } + + pubsub_topicSubscriptionAddSubscriber(subscription,subEP); + + status += pubsub_topicSubscriptionStart(subscription); + + } + + if(status==CELIX_SUCCESS){ + celixThreadMutex_lock(&admin->subscriptionsLock); + hashMap_put(admin->subscriptions,strdup(scope_topic),subscription); + celixThreadMutex_unlock(&admin->subscriptionsLock); + } + } + + if (status == CELIX_SUCCESS){ + pubsub_topicIncreaseNrSubscribers(subscription); + } + } + + free(scope_topic); + celixThreadMutex_unlock(&admin->externalPublicationsLock); + celixThreadMutex_unlock(&admin->localPublicationsLock); + + return status; + +} + +celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ + celix_status_t status = CELIX_SUCCESS; + + printf("PSA: Removing subscription [FWUUID=%s bundleID=%ld topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->topic); + + celixThreadMutex_lock(&admin->subscriptionsLock); + + topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,subEP->topic); + + if(sub!=NULL){ + pubsub_topicDecreaseNrSubscribers(sub); + if(pubsub_topicGetNrSubscribers(sub) == 0) { + status = pubsub_topicSubscriptionRemoveSubscriber(sub,subEP); + } + } + else{ + status = CELIX_ILLEGAL_STATE; + } + + celixThreadMutex_unlock(&admin->subscriptionsLock); + + return status; + +} + +celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP) { + celix_status_t status = CELIX_SUCCESS; + + printf("PSA: Received publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n", pubEP->frameworkUUID, pubEP->serviceID, pubEP->scope, pubEP->topic); + + const char* fwUUID = NULL; + + char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic); + bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID); + if (fwUUID == NULL) { + printf("PSA: Cannot retrieve fwUUID.\n"); + return CELIX_INVALID_BUNDLE_CONTEXT; + } + + if ((strcmp(pubEP->frameworkUUID, fwUUID) == 0) && (pubEP->endpoint == NULL)) { + + celixThreadMutex_lock(&admin->localPublicationsLock); + + service_factory_pt factory = (service_factory_pt) hashMap_get(admin->localPublications, scope_topic); + + if (factory == NULL) { + topic_publication_pt pub = NULL; + status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, admin->ipAddress, admin->basePort, admin->maxPort, &pub); + if (status == CELIX_SUCCESS) { + status = pubsub_topicPublicationStart(admin->bundle_context, pub, &factory); + if (status == CELIX_SUCCESS && factory != NULL) { + hashMap_put(admin->localPublications, strdup(scope_topic), factory); + } + } else { + printf("PSA: Cannot create a topicPublication for scope=%s, topic=%s (bundle %ld).\n", pubEP->scope, pubEP->topic, pubEP->serviceID); + } + } else { + //just add the new EP to the list + topic_publication_pt pub = (topic_publication_pt) factory->handle; + pubsub_topicPublicationAddPublisherEP(pub, pubEP); + } + + celixThreadMutex_unlock(&admin->localPublicationsLock); + } else { + + celixThreadMutex_lock(&admin->externalPublicationsLock); + array_list_pt ext_pub_list = (array_list_pt) hashMap_get(admin->externalPublications, scope_topic); + if (ext_pub_list == NULL) { + arrayList_create(&ext_pub_list); + hashMap_put(admin->externalPublications, strdup(scope_topic), ext_pub_list); + } + + arrayList_add(ext_pub_list, pubEP); + + celixThreadMutex_unlock(&admin->externalPublicationsLock); + } + + /* Re-evaluate the pending subscriptions */ + celixThreadMutex_lock(&admin->pendingSubscriptionsLock); + + hash_map_entry_pt pendingSub = hashMap_getEntry(admin->pendingSubscriptions, scope_topic); + if (pendingSub != NULL) { //There were pending subscription for the just published topic. Let's connect them. + char* topic = (char*) hashMapEntry_getKey(pendingSub); + array_list_pt pendingSubList = (array_list_pt) hashMapEntry_getValue(pendingSub); + int i; + for (i = 0; i < arrayList_size(pendingSubList); i++) { + pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) arrayList_get(pendingSubList, i); + pubsubAdmin_addSubscription(admin, subEP); + } + hashMap_remove(admin->pendingSubscriptions, scope_topic); + arrayList_clear(pendingSubList); + arrayList_destroy(pendingSubList); + free(topic); + } + + celixThreadMutex_unlock(&admin->pendingSubscriptionsLock); + + /* Connect the new publisher to the subscription for his topic, if there is any */ + celixThreadMutex_lock(&admin->subscriptionsLock); + + topic_subscription_pt sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, scope_topic); + if (sub != NULL && pubEP->endpoint != NULL) { + pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, pubEP->endpoint); + } + + /* And check also for ANY subscription */ + topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC); + if (any_sub != NULL && pubEP->endpoint != NULL) { + pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, pubEP->endpoint); + } + free(scope_topic); + + celixThreadMutex_unlock(&admin->subscriptionsLock); + + return status; + +} + +celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){ + celix_status_t status = CELIX_SUCCESS; + int count = 0; + + printf("PSA: Removing publication [FWUUID=%s bundleID=%ld topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->topic); + + const char* fwUUID = NULL; + + bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); + if(fwUUID==NULL){ + printf("PSA: Cannot retrieve fwUUID.\n"); + return CELIX_INVALID_BUNDLE_CONTEXT; + } + char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic); + if(strcmp(pubEP->frameworkUUID,fwUUID)==0){ + + celixThreadMutex_lock(&admin->localPublicationsLock); + + service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic); + if(factory!=NULL){ + topic_publication_pt pub = (topic_publication_pt)factory->handle; + pubsub_topicPublicationRemovePublisherEP(pub,pubEP); + } + else{ + status = CELIX_ILLEGAL_STATE; + } + + celixThreadMutex_unlock(&admin->localPublicationsLock); + } + else{ + + celixThreadMutex_lock(&admin->externalPublicationsLock); + array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic); + if(ext_pub_list!=NULL){ + int i; + bool found = false; + for(i=0;!found && iendpoint,p->endpoint) == 0) { + count++; + } + } + + } + if(arrayList_size(ext_pub_list)==0){ + hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic); + char* topic = (char*)hashMapEntry_getKey(entry); + array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry); + hashMap_remove(admin->externalPublications,topic); + arrayList_destroy(list); + free(topic); + } + + celixThreadMutex_unlock(&admin->externalPublicationsLock); + } + + /* Check if this publisher was connected to one of our subscribers*/ + celixThreadMutex_lock(&admin->subscriptionsLock); + + topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic); + if(sub!=NULL && pubEP->endpoint!=NULL && count == 0){ + pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint); + } + + /* And check also for ANY subscription */ + topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC); + if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){ + pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint); + } + free(scope_topic); + celixThreadMutex_unlock(&admin->subscriptionsLock); + + return status; + +} + +celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin, char *scope, char* topic){ + celix_status_t status = CELIX_SUCCESS; + + printf("PSA: Closing all publications\n"); + + celixThreadMutex_lock(&admin->localPublicationsLock); + char *scope_topic = createScopeTopicKey(scope, topic); + hash_map_entry_pt pubsvc_entry = (hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic); + if(pubsvc_entry!=NULL){ + char* topic = (char*)hashMapEntry_getKey(pubsvc_entry); + service_factory_pt factory= (service_factory_pt)hashMapEntry_getValue(pubsvc_entry); + topic_publication_pt pub = (topic_publication_pt)factory->handle; + status += pubsub_topicPublicationStop(pub); + status += pubsub_topicPublicationDestroy(pub); + + hashMap_remove(admin->localPublications,scope_topic); + free(topic); + free(factory); + } + free(scope_topic); + celixThreadMutex_unlock(&admin->localPublicationsLock); + + return status; + +} + +celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope,char* topic){ + celix_status_t status = CELIX_SUCCESS; + + printf("PSA: Closing all subscriptions\n"); + + celixThreadMutex_lock(&admin->subscriptionsLock); + char *scope_topic = createScopeTopicKey(scope, topic); + hash_map_entry_pt sub_entry = (hash_map_entry_pt)hashMap_getEntry(admin->subscriptions,scope_topic); + if(sub_entry!=NULL){ + char* topic = (char*)hashMapEntry_getKey(sub_entry); + + topic_subscription_pt ts = (topic_subscription_pt)hashMapEntry_getValue(sub_entry); + status += pubsub_topicSubscriptionStop(ts); + status += pubsub_topicSubscriptionDestroy(ts); + + hashMap_remove(admin->subscriptions,scope_topic); + free(topic); + + } + free(scope_topic); + celixThreadMutex_unlock(&admin->subscriptionsLock); + + return status; + +} + + +#ifndef ANDROID +static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip) { + celix_status_t status = CELIX_BUNDLE_EXCEPTION; + + struct ifaddrs *ifaddr, *ifa; + char host[NI_MAXHOST]; + + if (getifaddrs(&ifaddr) != -1) + { + for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = ifa->ifa_next) + { + if (ifa->ifa_addr == NULL) + continue; + + if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) { + if (interface == NULL) { + *ip = strdup(host); + status = CELIX_SUCCESS; + } + else if (strcmp(ifa->ifa_name, interface) == 0) { + *ip = strdup(host); + status = CELIX_SUCCESS; + } + } + } + + freeifaddrs(ifaddr); + } + + return status; +} +#endif + +static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ + celix_status_t status = CELIX_SUCCESS; + char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic); + array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic); + if(pendingListPerTopic==NULL){ + arrayList_create(&pendingListPerTopic); + hashMap_put(admin->pendingSubscriptions,strdup(scope_topic),pendingListPerTopic); + } + arrayList_add(pendingListPerTopic,subEP); + free(scope_topic); + return status; +} http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c new file mode 100644 index 0000000..4943884 --- /dev/null +++ b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_publication.c @@ -0,0 +1,605 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * topic_publication.c + * + * \date Sep 24, 2015 + * \author Apache Celix Project Team + * \copyright Apache License, Version 2.0 + */ + +#include "pubsub_publish_service_private.h" +#include +/* The following undefs prevent the collision between: + * - sys/syslog.h (which is included within czmq) + * - celix/dfi/dfi_log_util.h + */ +#undef LOG_DEBUG +#undef LOG_WARNING +#undef LOG_INFO +#undef LOG_WARNING + +#include +#include +#include + +#include "array_list.h" +#include "celixbool.h" +#include "service_registration.h" +#include "utils.h" +#include "service_factory.h" +#include "version.h" + +#include "pubsub_common.h" +#include "dyn_msg_utils.h" +#include "pubsub_utils.h" +#include "publisher.h" + +#include "pubsub_serializer.h" + +#ifdef USE_ZMQ_SECURITY + #include "zmq_crypto.h" + + #define MAX_CERT_PATH_LENGTH 512 +#endif + +#define EP_ADDRESS_LEN 32 +#define ZMQ_BIND_MAX_RETRY 5 + +#define FIRST_SEND_DELAY 2 + +struct topic_publication { + zsock_t* zmq_socket; + zcert_t * zmq_cert; + char* endpoint; + service_registration_pt svcFactoryReg; + array_list_pt pub_ep_list; //List + hash_map_pt boundServices; // + celix_thread_mutex_t tp_lock; +}; + +typedef struct publish_bundle_bound_service { + topic_publication_pt parent; + pubsub_publisher_pt service; + bundle_pt bundle; + char *topic; + hash_map_pt msgTypes; + unsigned short getCount; + celix_thread_mutex_t mp_lock; + bool mp_send_in_progress; + array_list_pt mp_parts; +}* publish_bundle_bound_service_pt; + +typedef struct pubsub_msg{ + pubsub_msg_header_pt header; + char* payload; + int payloadSize; +}* pubsub_msg_pt; + +static unsigned int rand_range(unsigned int min, unsigned int max); + +static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service); +static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service); + +static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle); +static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc); + +static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, void *msg); +static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, void *msg, int flags); +static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId); + +static void delay_first_send_for_late_joiners(void); + +celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP,char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){ + celix_status_t status = CELIX_SUCCESS; + +#ifdef USE_ZMQ_SECURITY + char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context); + if (keys_bundle_dir == NULL){ + return CELIX_SERVICE_EXCEPTION; + } + + const char* keys_file_path = NULL; + const char* keys_file_name = NULL; + bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, &keys_file_path); + bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, &keys_file_name); + + char cert_path[MAX_CERT_PATH_LENGTH]; + + //certificate path ".cache/bundle{id}/version0.0/./META-INF/keys/publisher/private/pub_{topic}.key" + snprintf(cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/publisher/private/pub_%s.key.enc", keys_bundle_dir, pubEP->topic); + free(keys_bundle_dir); + printf("PSA: Loading key '%s'\n", cert_path); + + zcert_t* pub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, cert_path); + if (pub_cert == NULL){ + printf("PSA: Cannot load key '%s'\n", cert_path); + return CELIX_SERVICE_EXCEPTION; + } +#endif + + zsock_t* socket = zsock_new (ZMQ_PUB); + if(socket==NULL){ + #ifdef USE_ZMQ_SECURITY + zcert_destroy(&pub_cert); + #endif + + perror("Error for zmq_socket"); + return CELIX_SERVICE_EXCEPTION; + } +#ifdef USE_ZMQ_SECURITY + zcert_apply (pub_cert, socket); // apply certificate to socket + zsock_set_curve_server (socket, true); // setup the publisher's socket to use the curve functions +#endif + + int rv = -1, retry=0; + char* ep = malloc(EP_ADDRESS_LEN); + char bindAddress[EP_ADDRESS_LEN]; + + while(rv==-1 && retrypub_ep_list)); + pub->boundServices = hashMap_create(NULL,NULL,NULL,NULL); + celixThreadMutex_create(&(pub->tp_lock),NULL); + + pub->endpoint = ep; + pub->zmq_socket = socket; + + #ifdef USE_ZMQ_SECURITY + pub->zmq_cert = pub_cert; + #endif + + pubsub_topicPublicationAddPublisherEP(pub,pubEP); + + *out = pub; + + return status; +} + +celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){ + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&(pub->tp_lock)); + + free(pub->endpoint); + arrayList_destroy(pub->pub_ep_list); + + hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices); + while(hashMapIterator_hasNext(iter)){ + publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter); + pubsub_destroyPublishBundleBoundService(bound); + } + hashMapIterator_destroy(iter); + hashMap_destroy(pub->boundServices,false,false); + + pub->svcFactoryReg = NULL; + zsock_destroy(&(pub->zmq_socket)); + #ifdef USE_ZMQ_SECURITY + zcert_destroy(&(pub->zmq_cert)); + #endif + + celixThreadMutex_unlock(&(pub->tp_lock)); + + celixThreadMutex_destroy(&(pub->tp_lock)); + + free(pub); + + return status; +} + +celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory){ + celix_status_t status = CELIX_SUCCESS; + + /* Let's register the new service */ + //celixThreadMutex_lock(&(pub->tp_lock)); + + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0); + + if(pubEP!=NULL){ + service_factory_pt factory = calloc(1, sizeof(*factory)); + factory->handle = pub; + factory->getService = pubsub_topicPublicationGetService; + factory->ungetService = pubsub_topicPublicationUngetService; + + properties_pt props = properties_create(); + properties_set(props,PUBSUB_PUBLISHER_TOPIC,pubEP->topic); + properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope); + properties_set(props,"service.version", PUBSUB_PUBLISHER_SERVICE_VERSION); + + status = bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg)); + + if(status != CELIX_SUCCESS){ + properties_destroy(props); + printf("PSA: Cannot register ServiceFactory for topic %s (bundle %ld).\n",pubEP->topic,pubEP->serviceID); + } + else{ + *svcFactory = factory; + } + } + else{ + printf("PSA: Cannot find pubsub_endpoint after adding it...Should never happen!\n"); + status = CELIX_SERVICE_EXCEPTION; + } + + //celixThreadMutex_unlock(&(pub->tp_lock)); + + return status; +} + +celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){ + celix_status_t status = CELIX_SUCCESS; + + //celixThreadMutex_lock(&(pub->tp_lock)); + + status = serviceRegistration_unregister(pub->svcFactoryReg); + + //celixThreadMutex_unlock(&(pub->tp_lock)); + + return status; +} + +celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){ + + celixThreadMutex_lock(&(pub->tp_lock)); + ep->endpoint = strdup(pub->endpoint); + arrayList_add(pub->pub_ep_list,ep); + celixThreadMutex_unlock(&(pub->tp_lock)); + + return CELIX_SUCCESS; +} + +celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){ + + celixThreadMutex_lock(&(pub->tp_lock)); + for (int i = 0; i < arrayList_size(pub->pub_ep_list); i++) { + pubsub_endpoint_pt e = arrayList_get(pub->pub_ep_list, i); + if(pubsubEndpoint_equals(ep, e)) { + arrayList_removeElement(pub->pub_ep_list,ep); + break; + } + } + celixThreadMutex_unlock(&(pub->tp_lock)); + + return CELIX_SUCCESS; +} + +array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){ + return pub->pub_ep_list; +} + + +static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) { + celix_status_t status = CELIX_SUCCESS; + + topic_publication_pt publish = (topic_publication_pt)handle; + + celixThreadMutex_lock(&(publish->tp_lock)); + + publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle); + if(bound==NULL){ + bound = pubsub_createPublishBundleBoundService(publish,bundle); + if(bound!=NULL){ + hashMap_put(publish->boundServices,bundle,bound); + } + } + else{ + bound->getCount++; + } + + *service = bound->service; + + celixThreadMutex_unlock(&(publish->tp_lock)); + + return status; +} + +static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) { + + topic_publication_pt publish = (topic_publication_pt)handle; + + celixThreadMutex_lock(&(publish->tp_lock)); + + publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle); + if(bound!=NULL){ + + bound->getCount--; + if(bound->getCount==0){ + pubsub_destroyPublishBundleBoundService(bound); + hashMap_remove(publish->boundServices,bundle); + } + + } + else{ + long bundleId = -1; + bundle_getBundleId(bundle,&bundleId); + printf("TP: Unexpected ungetService call for bundle %ld.\n", bundleId); + } + + /* service should be never used for unget, so let's set the pointer to NULL */ + *service = NULL; + + celixThreadMutex_unlock(&(publish->tp_lock)); + + return CELIX_SUCCESS; +} + +static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_pt msg, bool last){ + + bool ret = true; + + zframe_t* headerMsg = zframe_new(msg->header, sizeof(struct pubsub_msg_header)); + if (headerMsg == NULL) ret=false; + zframe_t* payloadMsg = zframe_new(msg->payload, msg->payloadSize); + if (payloadMsg == NULL) ret=false; + + delay_first_send_for_late_joiners(); + + if( zframe_send(&headerMsg,zmq_socket, ZFRAME_MORE) == -1) ret=false; + + if(!last){ + if( zframe_send(&payloadMsg,zmq_socket, ZFRAME_MORE) == -1) ret=false; + } + else{ + if( zframe_send(&payloadMsg,zmq_socket, 0) == -1) ret=false; + } + + if (!ret){ + zframe_destroy(&headerMsg); + zframe_destroy(&payloadMsg); + } + + free(msg->header); + free(msg->payload); + free(msg); + + return ret; + +} + +static bool send_pubsub_mp_msg(zsock_t* zmq_socket, array_list_pt mp_msg_parts){ + + bool ret = true; + + unsigned int i = 0; + unsigned int mp_num = arrayList_size(mp_msg_parts); + for(;imp_lock)); + if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & PUBSUB_PUBLISHER_LAST_MSG) && bound->mp_send_in_progress){ //means a real mp_msg + printf("TP: Multipart send already in progress. Cannot process a new one.\n"); + celixThreadMutex_unlock(&(bound->mp_lock)); + return -3; + } + + pubsub_message_type *msgType = hashMap_get(bound->msgTypes, &msgTypeId); + + int major=0, minor=0; + + if (msgType != NULL) { + + version_pt msgVersion = pubsubSerializer_getVersion(msgType); + + pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header)); + + strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1); + + msg_hdr->type = msgTypeId; + if (msgVersion != NULL){ + version_getMajor(msgVersion, &major); + version_getMinor(msgVersion, &minor); + msg_hdr->major = major; + msg_hdr->minor = minor; + } + + void* serializedOutput = NULL; + int serializedOutputLen = 0; + pubsubSerializer_serialize(msgType, msg, &serializedOutput, &serializedOutputLen); + + pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg)); + msg->header = msg_hdr; + msg->payload = (char *) serializedOutput; + msg->payloadSize = serializedOutputLen; + + bool snd = true; + + switch(flags){ + case PUBSUB_PUBLISHER_FIRST_MSG: + bound->mp_send_in_progress = true; + arrayList_add(bound->mp_parts,msg); + break; + case PUBSUB_PUBLISHER_PART_MSG: + if(!bound->mp_send_in_progress){ + printf("TP: ERROR: received msg part without the first part.\n"); + status = -4; + } + else{ + arrayList_add(bound->mp_parts,msg); + } + break; + case PUBSUB_PUBLISHER_LAST_MSG: + if(!bound->mp_send_in_progress){ + printf("TP: ERROR: received end msg without the first part.\n"); + status = -4; + } + else{ + arrayList_add(bound->mp_parts,msg); + celixThreadMutex_lock(&(bound->parent->tp_lock)); + snd = send_pubsub_mp_msg(bound->parent->zmq_socket,bound->mp_parts); + bound->mp_send_in_progress = false; + celixThreadMutex_unlock(&(bound->parent->tp_lock)); + } + break; + case PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG: //Normal send case + celixThreadMutex_lock(&(bound->parent->tp_lock)); + snd = send_pubsub_msg(bound->parent->zmq_socket,msg,true); + celixThreadMutex_unlock(&(bound->parent->tp_lock)); + break; + default: + printf("TP: ERROR: Invalid MP flags combination\n"); + status = -4; + break; + } + + if(!snd){ + printf("TP: Failed to send %s message %u.\n",flags == (PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG) ? "single" : "multipart", msgTypeId); + } + + } else { + printf("TP: Message %u not supported.",msgTypeId); + status=-1; + } + + celixThreadMutex_unlock(&(bound->mp_lock)); + + return status; + +} + +static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){ + *msgTypeId = pubsubSerializer_hashCode(msgType); + return 0; +} + +static unsigned int rand_range(unsigned int min, unsigned int max){ + + double scaled = (double)(((double)rand())/((double)RAND_MAX)); + return (max-min+1)*scaled + min; + +} + +static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){ + + publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound)); + + if (bound != NULL) { + bound->service = calloc(1, sizeof(*bound->service)); + } + + if (bound != NULL && bound->service != NULL) { + + bound->parent = tp; + bound->bundle = bundle; + bound->getCount = 1; + bound->mp_send_in_progress = false; + celixThreadMutex_create(&bound->mp_lock,NULL); + bound->msgTypes = hashMap_create(uintHash, NULL, uintEquals, NULL); // + arrayList_create(&bound->mp_parts); + + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0); + bound->topic=strdup(pubEP->topic); + + bound->service->handle = bound; + bound->service->localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID; + bound->service->send = pubsub_topicPublicationSend; + bound->service->sendMultipart = pubsub_topicPublicationSendMultipart; + + pubsubSerializer_fillMsgTypesMap(bound->msgTypes,bound->bundle); + + } + else + { + if (bound != NULL) { + free(bound->service); + } + free(bound); + return NULL; + } + + return bound; +} + +static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){ + + celixThreadMutex_lock(&boundSvc->mp_lock); + + if(boundSvc->service != NULL){ + free(boundSvc->service); + } + + if(boundSvc->msgTypes != NULL){ + pubsubSerializer_emptyMsgTypesMap(boundSvc->msgTypes); + hashMap_destroy(boundSvc->msgTypes,false,false); + } + + if(boundSvc->mp_parts!=NULL){ + arrayList_destroy(boundSvc->mp_parts); + } + + if(boundSvc->topic!=NULL){ + free(boundSvc->topic); + } + + celixThreadMutex_unlock(&boundSvc->mp_lock); + celixThreadMutex_destroy(&boundSvc->mp_lock); + + free(boundSvc); + +} + +static void delay_first_send_for_late_joiners(){ + + static bool firstSend = true; + + if(firstSend){ + printf("TP: Delaying first send for late joiners...\n"); + sleep(FIRST_SEND_DELAY); + firstSend = false; + } +} http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c new file mode 100644 index 0000000..9e1a47d --- /dev/null +++ b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c @@ -0,0 +1,741 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * topic_subscription.c + * + * \date Oct 2, 2015 + * \author Apache Celix Project Team + * \copyright Apache License, Version 2.0 + */ + +#include "topic_subscription.h" +#include +/* The following undefs prevent the collision between: + * - sys/syslog.h (which is included within czmq) + * - celix/dfi/dfi_log_util.h + */ +#undef LOG_DEBUG +#undef LOG_WARNING +#undef LOG_INFO +#undef LOG_WARNING + +#include +#include +#include + +#include "utils.h" +#include "celix_errno.h" +#include "constants.h" +#include "version.h" + +#include "subscriber.h" +#include "publisher.h" +#include "dyn_msg_utils.h" +#include "pubsub_utils.h" + +#include "pubsub_serializer.h" + +#ifdef USE_ZMQ_SECURITY + #include "zmq_crypto.h" + + #define MAX_CERT_PATH_LENGTH 512 +#endif + +#define POLL_TIMEOUT 250 +#define ZMQ_POLL_TIMEOUT_MS_ENV "ZMQ_POLL_TIMEOUT_MS" + +struct topic_subscription{ + + zsock_t* zmq_socket; + zcert_t * zmq_cert; + zcert_t * zmq_pub_cert; + pthread_mutex_t socket_lock; + service_tracker_pt tracker; + array_list_pt sub_ep_list; + celix_thread_t recv_thread; + bool running; + celix_thread_mutex_t ts_lock; + bundle_context_pt context; + + hash_map_pt servicesMap; // key = service, value = msg types map + array_list_pt pendingConnections; + array_list_pt pendingDisconnections; + + celix_thread_mutex_t pendingConnections_lock; + celix_thread_mutex_t pendingDisconnections_lock; + unsigned int nrSubscribers; +}; + +typedef struct complete_zmq_msg{ + zframe_t* header; + zframe_t* payload; +}* complete_zmq_msg_pt; + +typedef struct mp_handle{ + hash_map_pt svc_msg_db; + hash_map_pt rcv_msg_map; +}* mp_handle_pt; + +typedef struct msg_map_entry{ + bool retain; + void* msgInst; +}* msg_map_entry_pt; + +static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service); +static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service); +static void* zmq_recv_thread_func(void* arg); +static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr); +static void sigusr1_sighandler(int signo); +static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId); +static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain, void **part); +static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt rcv_msg_list); +static void destroy_mp_handle(mp_handle_pt mp_handle); +static void connectPendingPublishers(topic_subscription_pt sub); +static void disconnectPendingPublishers(topic_subscription_pt sub); + +celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, char* scope, char* topic,topic_subscription_pt* out){ + celix_status_t status = CELIX_SUCCESS; + +#ifdef USE_ZMQ_SECURITY + char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context); + if (keys_bundle_dir == NULL){ + return CELIX_SERVICE_EXCEPTION; + } + + const char* keys_file_path = NULL; + const char* keys_file_name = NULL; + bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, &keys_file_path); + bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, &keys_file_name); + + char sub_cert_path[MAX_CERT_PATH_LENGTH]; + char pub_cert_path[MAX_CERT_PATH_LENGTH]; + + //certificate path ".cache/bundle{id}/version0.0/./META-INF/keys/subscriber/private/sub_{topic}.key.enc" + snprintf(sub_cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/subscriber/private/sub_%s.key.enc", keys_bundle_dir, topic); + snprintf(pub_cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/publisher/public/pub_%s.pub", keys_bundle_dir, topic); + free(keys_bundle_dir); + + printf("PSA: Loading subscriber key '%s'\n", sub_cert_path); + printf("PSA: Loading publisher key '%s'\n", pub_cert_path); + + zcert_t* sub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, sub_cert_path); + if (sub_cert == NULL){ + printf("PSA: Cannot load key '%s'\n", sub_cert_path); + return CELIX_SERVICE_EXCEPTION; + } + + zcert_t* pub_cert = zcert_load(pub_cert_path); + if (pub_cert == NULL){ + zcert_destroy(&sub_cert); + printf("PSA: Cannot load key '%s'\n", pub_cert_path); + return CELIX_SERVICE_EXCEPTION; + } + + const char* pub_key = zcert_public_txt(pub_cert); +#endif + + zsock_t* zmq_s = zsock_new (ZMQ_SUB); + if(zmq_s==NULL){ + #ifdef USE_ZMQ_SECURITY + zcert_destroy(&sub_cert); + zcert_destroy(&pub_cert); + #endif + + return CELIX_SERVICE_EXCEPTION; + } + + #ifdef USE_ZMQ_SECURITY + zcert_apply (sub_cert, zmq_s); + zsock_set_curve_serverkey (zmq_s, pub_key); //apply key of publisher to socket of subscriber + #endif + + if(strcmp(topic,PUBSUB_ANY_SUB_TOPIC)==0){ + zsock_set_subscribe (zmq_s, ""); + } + else{ + zsock_set_subscribe (zmq_s, topic); + } + + topic_subscription_pt ts = (topic_subscription_pt) calloc(1,sizeof(*ts)); + ts->context = bundle_context; + ts->zmq_socket = zmq_s; + ts->running = false; + ts->nrSubscribers = 0; + + #ifdef USE_ZMQ_SECURITY + ts->zmq_cert = sub_cert; + ts->zmq_pub_cert = pub_cert; + #endif + + celixThreadMutex_create(&ts->socket_lock, NULL); + celixThreadMutex_create(&ts->ts_lock,NULL); + arrayList_create(&ts->sub_ep_list); + ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL); + arrayList_create(&ts->pendingConnections); + arrayList_create(&ts->pendingDisconnections); + celixThreadMutex_create(&ts->pendingConnections_lock, NULL); + celixThreadMutex_create(&ts->pendingDisconnections_lock, NULL); + + char filter[128]; + memset(filter,0,128); + if(strncmp(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT,scope,strlen(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT)) == 0) { + // default scope, means that subscriber has not defined a scope property + snprintf(filter, 128, "(&(%s=%s)(%s=%s))", + (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME, + PUBSUB_SUBSCRIBER_TOPIC,topic); + + } else { + snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))", + (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME, + PUBSUB_SUBSCRIBER_TOPIC,topic, + PUBSUB_SUBSCRIBER_SCOPE,scope); + } + service_tracker_customizer_pt customizer = NULL; + status += serviceTrackerCustomizer_create(ts,NULL,topicsub_subscriberTracked,NULL,topicsub_subscriberUntracked,&customizer); + status += serviceTracker_createWithFilter(bundle_context, filter, customizer, &ts->tracker); + + struct sigaction actions; + memset(&actions, 0, sizeof(actions)); + sigemptyset(&actions.sa_mask); + actions.sa_flags = 0; + actions.sa_handler = sigusr1_sighandler; + + sigaction(SIGUSR1,&actions,NULL); + + *out=ts; + + return status; +} + +celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){ + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&ts->ts_lock); + + ts->running = false; + serviceTracker_destroy(ts->tracker); + arrayList_clear(ts->sub_ep_list); + arrayList_destroy(ts->sub_ep_list); + hashMap_destroy(ts->servicesMap,false,false); + + celixThreadMutex_lock(&ts->pendingConnections_lock); + arrayList_destroy(ts->pendingConnections); + celixThreadMutex_unlock(&ts->pendingConnections_lock); + celixThreadMutex_destroy(&ts->pendingConnections_lock); + + celixThreadMutex_lock(&ts->pendingDisconnections_lock); + arrayList_destroy(ts->pendingDisconnections); + celixThreadMutex_unlock(&ts->pendingDisconnections_lock); + celixThreadMutex_destroy(&ts->pendingDisconnections_lock); + + celixThreadMutex_lock(&ts->socket_lock); + zsock_destroy(&(ts->zmq_socket)); + #ifdef USE_ZMQ_SECURITY + zcert_destroy(&(ts->zmq_cert)); + zcert_destroy(&(ts->zmq_pub_cert)); + #endif + celixThreadMutex_unlock(&ts->socket_lock); + celixThreadMutex_destroy(&ts->socket_lock); + + celixThreadMutex_unlock(&ts->ts_lock); + + + free(ts); + + return status; +} + +celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts){ + celix_status_t status = CELIX_SUCCESS; + + //celixThreadMutex_lock(&ts->ts_lock); + + status = serviceTracker_open(ts->tracker); + + ts->running = true; + + if(status==CELIX_SUCCESS){ + status=celixThread_create(&ts->recv_thread,NULL,zmq_recv_thread_func,ts); + } + + //celixThreadMutex_unlock(&ts->ts_lock); + + return status; +} + +celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){ + celix_status_t status = CELIX_SUCCESS; + + //celixThreadMutex_lock(&ts->ts_lock); + + ts->running = false; + + pthread_kill(ts->recv_thread.thread,SIGUSR1); + + celixThread_join(ts->recv_thread,NULL); + + status = serviceTracker_close(ts->tracker); + + //celixThreadMutex_unlock(&ts->ts_lock); + + return status; +} + +celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL){ + celix_status_t status = CELIX_SUCCESS; + celixThreadMutex_lock(&ts->socket_lock); + if(!zsock_is(ts->zmq_socket) || zsock_connect(ts->zmq_socket,pubURL) != 0){ + status = CELIX_SERVICE_EXCEPTION; + } + celixThreadMutex_unlock(&ts->socket_lock); + + return status; +} + +celix_status_t pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) { + celix_status_t status = CELIX_SUCCESS; + char *url = strdup(pubURL); + celixThreadMutex_lock(&ts->pendingConnections_lock); + arrayList_add(ts->pendingConnections, url); + celixThreadMutex_unlock(&ts->pendingConnections_lock); + return status; +} + +celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) { + celix_status_t status = CELIX_SUCCESS; + char *url = strdup(pubURL); + celixThreadMutex_lock(&ts->pendingDisconnections_lock); + arrayList_add(ts->pendingDisconnections, url); + celixThreadMutex_unlock(&ts->pendingDisconnections_lock); + return status; +} + +celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL){ + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&ts->socket_lock); + if(!zsock_is(ts->zmq_socket) || zsock_disconnect(ts->zmq_socket,pubURL) != 0){ + status = CELIX_SERVICE_EXCEPTION; + } + celixThreadMutex_unlock(&ts->socket_lock); + + return status; +} + +celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP){ + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&ts->ts_lock); + + arrayList_add(ts->sub_ep_list,subEP); + + celixThreadMutex_unlock(&ts->ts_lock); + + return status; + +} + +celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt ts) { + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&ts->ts_lock); + + ts->nrSubscribers++; + + celixThreadMutex_unlock(&ts->ts_lock); + + return status; +} + +celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP){ + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&ts->ts_lock); + + arrayList_removeElement(ts->sub_ep_list,subEP); + + celixThreadMutex_unlock(&ts->ts_lock); + + return status; + +} + +celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt ts) { + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&ts->ts_lock); + + ts->nrSubscribers--; + + celixThreadMutex_unlock(&ts->ts_lock); + + return status; +} + +unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) { + return ts->nrSubscribers; +} + +static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service){ + celix_status_t status = CELIX_SUCCESS; + topic_subscription_pt ts = handle; + + celixThreadMutex_lock(&ts->ts_lock); + if (!hashMap_containsKey(ts->servicesMap, service)) { + hash_map_pt msgTypes = hashMap_create(uintHash, NULL, uintEquals, NULL); //key = msgId, value = pubsub_message_type + + bundle_pt bundle = NULL; + serviceReference_getBundle(reference, &bundle); + pubsubSerializer_fillMsgTypesMap(msgTypes,bundle); + + if(hashMap_size(msgTypes)==0){ //If the msgTypes hashMap is not filled, the service is an unsupported subscriber + hashMap_destroy(msgTypes,false,false); + printf("TS: Unsupported subscriber!\n"); + } + else{ + hashMap_put(ts->servicesMap, service, msgTypes); + } + + } + celixThreadMutex_unlock(&ts->ts_lock); + printf("TS: New subscriber registered.\n"); + return status; + +} + +static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service){ + celix_status_t status = CELIX_SUCCESS; + topic_subscription_pt ts = handle; + + celixThreadMutex_lock(&ts->ts_lock); + if (hashMap_containsKey(ts->servicesMap, service)) { + hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service); + if(msgTypes!=NULL){ + pubsubSerializer_emptyMsgTypesMap(msgTypes); + hashMap_destroy(msgTypes,false,false); + } + } + celixThreadMutex_unlock(&ts->ts_lock); + + printf("TS: Subscriber unregistered.\n"); + return status; +} + + +static void process_msg(topic_subscription_pt sub,array_list_pt msg_list){ + + pubsub_msg_header_pt first_msg_hdr = (pubsub_msg_header_pt)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->header); + + hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap); + while (hashMapIterator_hasNext(iter)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry); + hash_map_pt msgTypes = hashMapEntry_getValue(entry); + + pubsub_message_type *msgType = hashMap_get(msgTypes,&(first_msg_hdr->type)); + if (msgType == NULL) { + printf("TS: Primary message %d not supported. NOT sending any part of the whole message.\n",first_msg_hdr->type); + } + else{ + void *msgInst = NULL; + char *name = pubsubSerializer_getName(msgType); + version_pt msgVersion = pubsubSerializer_getVersion(msgType); + + bool validVersion = checkVersion(msgVersion,first_msg_hdr); + + if(validVersion){ + + int rc = pubsubSerializer_deserialize(msgType, (const void *) zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), &msgInst); + + if (rc != -1) { + bool release = true; + + mp_handle_pt mp_handle = create_mp_handle(msgTypes,msg_list); + pubsub_multipart_callbacks_t mp_callbacks; + mp_callbacks.handle = mp_handle; + mp_callbacks.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForMsgType; + mp_callbacks.getMultipart = pubsub_getMultipart; + subsvc->receive(subsvc->handle, name, first_msg_hdr->type, msgInst, &mp_callbacks, &release); + + if(release){ + pubsubSerializer_freeMsg(msgType, msgInst); + } + if(mp_handle!=NULL){ + destroy_mp_handle(mp_handle); + } + } + else{ + printf("TS: Cannot deserialize msgType %s.\n",name); + } + + } + else{ + int major=0,minor=0; + version_getMajor(msgVersion,&major); + version_getMinor(msgVersion,&minor); + printf("TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n",name,major,minor,first_msg_hdr->major,first_msg_hdr->minor); + } + + } + } + hashMapIterator_destroy(iter); + + int i = 0; + for(;iheader)); + zframe_destroy(&(c_msg->payload)); + free(c_msg); + } + + arrayList_destroy(msg_list); + +} + +static void* zmq_recv_thread_func(void * arg) { + topic_subscription_pt sub = (topic_subscription_pt) arg; + + while (sub->running) { + + celixThreadMutex_lock(&sub->socket_lock); + + zframe_t* headerMsg = zframe_recv(sub->zmq_socket); + if (headerMsg == NULL) { + if (errno == EINTR) { + //It means we got a signal and we have to exit... + printf("TS: header_recv thread for topic got a signal and will exit.\n"); + } else { + perror("TS: header_recv thread"); + } + } else { + + pubsub_msg_header_pt hdr = (pubsub_msg_header_pt) zframe_data(headerMsg); + + if (zframe_more(headerMsg)) { + + zframe_t* payloadMsg = zframe_recv(sub->zmq_socket); + if (payloadMsg == NULL) { + if (errno == EINTR) { + //It means we got a signal and we have to exit... + printf("TS: payload_recv thread for topic got a signal and will exit.\n"); + } else { + perror("TS: payload_recv"); + } + zframe_destroy(&headerMsg); + } else { + + celixThreadMutex_lock(&sub->ts_lock); + + //Let's fetch all the messages from the socket + array_list_pt msg_list = NULL; + arrayList_create(&msg_list); + complete_zmq_msg_pt firstMsg = calloc(1, sizeof(struct complete_zmq_msg)); + firstMsg->header = headerMsg; + firstMsg->payload = payloadMsg; + arrayList_add(msg_list, firstMsg); + + bool more = zframe_more(payloadMsg); + while (more) { + + zframe_t* h_msg = zframe_recv(sub->zmq_socket); + if (h_msg == NULL) { + if (errno == EINTR) { + //It means we got a signal and we have to exit... + printf("TS: h_recv thread for topic got a signal and will exit.\n"); + } else { + perror("TS: h_recv"); + } + break; + } + + zframe_t* p_msg = zframe_recv(sub->zmq_socket); + if (p_msg == NULL) { + if (errno == EINTR) { + //It means we got a signal and we have to exit... + printf("TS: p_recv thread for topic got a signal and will exit.\n"); + } else { + perror("TS: p_recv"); + } + zframe_destroy(&h_msg); + break; + } + + complete_zmq_msg_pt c_msg = calloc(1, sizeof(struct complete_zmq_msg)); + c_msg->header = h_msg; + c_msg->payload = p_msg; + arrayList_add(msg_list, c_msg); + + if (!zframe_more(p_msg)) { + more = false; + } + } + + process_msg(sub, msg_list); + + celixThreadMutex_unlock(&sub->ts_lock); + + } + + } //zframe_more(headerMsg) + else { + free(headerMsg); + printf("TS: received message %u for topic %s without payload!\n", hdr->type, hdr->topic); + } + + } // headerMsg != NULL + celixThreadMutex_unlock(&sub->socket_lock); + connectPendingPublishers(sub); + disconnectPendingPublishers(sub); + } // while + + return NULL; +} + +static void connectPendingPublishers(topic_subscription_pt sub) { + celixThreadMutex_lock(&sub->pendingConnections_lock); + while(!arrayList_isEmpty(sub->pendingConnections)) { + char * pubEP = arrayList_remove(sub->pendingConnections, 0); + pubsub_topicSubscriptionConnectPublisher(sub, pubEP); + free(pubEP); + } + celixThreadMutex_unlock(&sub->pendingConnections_lock); +} + +static void disconnectPendingPublishers(topic_subscription_pt sub) { + celixThreadMutex_lock(&sub->pendingDisconnections_lock); + while(!arrayList_isEmpty(sub->pendingDisconnections)) { + char * pubEP = arrayList_remove(sub->pendingDisconnections, 0); + pubsub_topicSubscriptionDisconnectPublisher(sub, pubEP); + free(pubEP); + } + celixThreadMutex_unlock(&sub->pendingDisconnections_lock); +} + +static void sigusr1_sighandler(int signo){ + printf("TS: Topic subscription being shut down...\n"); + return; +} + +static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){ + bool check=false; + int major=0,minor=0; + + if(msgVersion!=NULL){ + version_getMajor(msgVersion,&major); + version_getMinor(msgVersion,&minor); + if(hdr->major==((unsigned char)major)){ /* Different major means incompatible */ + check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */ + } + } + + return check; +} + +static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId){ + *msgTypeId = pubsubSerializer_hashCode(msgType); + return 0; +} + +static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool retain, void **part){ + + if(handle==NULL){ + *part = NULL; + return -1; + } + + mp_handle_pt mp_handle = (mp_handle_pt)handle; + msg_map_entry_pt entry = hashMap_get(mp_handle->rcv_msg_map,&msgTypeId); + if(entry!=NULL){ + entry->retain = retain; + *part = entry->msgInst; + } + else{ + printf("TP: getMultipart cannot find msg '%u'\n",msgTypeId); + *part=NULL; + return -2; + } + + return 0; + +} + +static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt rcv_msg_list){ + + if(arrayList_size(rcv_msg_list)==1){ //Means it's not a multipart message + return NULL; + } + + mp_handle_pt mp_handle = calloc(1,sizeof(struct mp_handle)); + mp_handle->svc_msg_db = svc_msg_db; + mp_handle->rcv_msg_map = hashMap_create(uintHash, NULL, uintEquals, NULL); + + int i=1; //We skip the first message, it will be handle differently + for(;iheader); + + pubsub_message_type *msgType = hashMap_get(svc_msg_db,&(header->type)); + if (msgType != NULL) { + void *msgInst = NULL; + version_pt msgVersion = pubsubSerializer_getVersion(msgType); + + bool validVersion = checkVersion(msgVersion,header); + + if(validVersion){ + int rc = pubsubSerializer_deserialize(msgType, (const void *) zframe_data(c_msg->payload), &msgInst); + + if(rc != -1){ + unsigned int* msgId = calloc(1,sizeof(unsigned int)); + *msgId = header->type; + msg_map_entry_pt entry = calloc(1,sizeof(struct msg_map_entry)); + entry->msgInst = msgInst; + hashMap_put(mp_handle->rcv_msg_map,msgId,entry); + } + } + } + + } + + return mp_handle; + +} + +static void destroy_mp_handle(mp_handle_pt mp_handle){ + + hash_map_iterator_pt iter = hashMapIterator_create(mp_handle->rcv_msg_map); + while(hashMapIterator_hasNext(iter)){ + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + unsigned int* msgId = (unsigned int*)hashMapEntry_getKey(entry); + msg_map_entry_pt msgEntry = hashMapEntry_getValue(entry); + pubsub_message_type* msgType = hashMap_get(mp_handle->svc_msg_db,msgId); + if(msgType!=NULL){ + if(!msgEntry->retain){ + free(msgEntry->msgInst); + } + } + else{ + printf("TS: ERROR: Cannot find pubsub_message_type for msg %u, so cannot destroy it!\n",*msgId); + } + } + hashMapIterator_destroy(iter); + + hashMap_destroy(mp_handle->rcv_msg_map,true,true); + free(mp_handle); +} http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/zmq_crypto.c ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/zmq_crypto.c b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/zmq_crypto.c new file mode 100644 index 0000000..fe444bd --- /dev/null +++ b/celix-pubsub/pubsub/pubsub_admin_zmq/private/src/zmq_crypto.c @@ -0,0 +1,281 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * zmq_crypto.c + * + * \date Dec 2, 2016 + * \author Apache Celix Project Team + * \copyright Apache License, Version 2.0 + */ + +#include "zmq_crypto.h" + +#include +#include +#include +#include + +#include + +#define MAX_FILE_PATH_LENGTH 512 +#define ZMQ_KEY_LENGTH 40 +#define AES_KEY_LENGTH 32 +#define AES_IV_LENGTH 16 + +#define KEY_TO_GET "aes_key" +#define IV_TO_GET "aes_iv" + +static char* read_file_content(char* filePath, char* fileName); +static void parse_key_lines(char *keysBuffer, char **key, char **iv); +static void parse_key_line(char *line, char **key, char **iv); +static void extract_keys_from_buffer(unsigned char *input, int inputlen, char **publicKey, char **secretKey); + +/** + * Return a valid zcert_t from an encoded file + * Caller is responsible for freeing by calling zcert_destroy(zcert** cert); + */ +zcert_t* get_zcert_from_encoded_file(char* keysFilePath, char* keysFileName, char* file_path) +{ + + if (keysFilePath == NULL){ + keysFilePath = DEFAULT_KEYS_FILE_PATH; + } + + if (keysFileName == NULL){ + keysFileName = DEFAULT_KEYS_FILE_NAME; + } + + char* keys_data = read_file_content(keysFilePath, keysFileName); + if (keys_data == NULL){ + return NULL; + } + + char *key = NULL; + char *iv = NULL; + parse_key_lines(keys_data, &key, &iv); + free(keys_data); + + if (key == NULL || iv == NULL){ + free(key); + free(iv); + + printf("CRYPTO: Loading AES key and/or AES iv failed!\n"); + return NULL; + } + + //At this point, we know an aes key and iv are stored and loaded + + // generate sha256 hashes + unsigned char key_digest[EVP_MAX_MD_SIZE]; + unsigned char iv_digest[EVP_MAX_MD_SIZE]; + generate_sha256_hash((char*) key, key_digest); + generate_sha256_hash((char*) iv, iv_digest); + + zchunk_t* encoded_secret = zchunk_slurp (file_path, 0); + if (encoded_secret == NULL){ + free(key); + free(iv); + + return NULL; + } + + int encoded_secret_size = (int) zchunk_size (encoded_secret); + char* encoded_secret_data = zchunk_strdup(encoded_secret); + zchunk_destroy (&encoded_secret); + + // Decryption of data + int decryptedtext_len; + unsigned char decryptedtext[encoded_secret_size]; + decryptedtext_len = decrypt((unsigned char *) encoded_secret_data, encoded_secret_size, key_digest, iv_digest, decryptedtext); + decryptedtext[decryptedtext_len] = '\0'; + + EVP_cleanup(); + + free(encoded_secret_data); + free(key); + free(iv); + + // The public and private keys are retrieved + char* public_text = NULL; + char* secret_text = NULL; + + extract_keys_from_buffer(decryptedtext, decryptedtext_len, &public_text, &secret_text); + + byte public_key [32] = { 0 }; + byte secret_key [32] = { 0 }; + + zmq_z85_decode (public_key, public_text); + zmq_z85_decode (secret_key, secret_text); + + zcert_t* cert_loaded = zcert_new_from(public_key, secret_key); + + free(public_text); + free(secret_text); + + return cert_loaded; +} + +int generate_sha256_hash(char* text, unsigned char* digest) +{ + unsigned int digest_len; + + EVP_MD_CTX * mdctx = EVP_MD_CTX_new(); + EVP_DigestInit_ex(mdctx, EVP_sha256(), NULL); + EVP_DigestUpdate(mdctx, text, strlen(text)); + EVP_DigestFinal_ex(mdctx, digest, &digest_len); + EVP_MD_CTX_free(mdctx); + + return digest_len; +} + +int decrypt(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext) +{ + int len; + int plaintext_len; + + EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new(); + + EVP_DecryptInit_ex(ctx, EVP_aes_256_cbc(), NULL, key, iv); + EVP_DecryptUpdate(ctx, plaintext, &len, ciphertext, ciphertext_len); + plaintext_len = len; + EVP_DecryptFinal_ex(ctx, plaintext + len, &len); + plaintext_len += len; + + EVP_CIPHER_CTX_free(ctx); + + return plaintext_len; +} + +/** + * Caller is responsible for freeing the returned value + */ +static char* read_file_content(char* filePath, char* fileName){ + + char fileNameWithPath[MAX_FILE_PATH_LENGTH]; + snprintf(fileNameWithPath, MAX_FILE_PATH_LENGTH, "%s/%s", filePath, fileName); + int rc = 0; + + if (!zsys_file_exists(fileNameWithPath)){ + printf("CRYPTO: Keys file '%s' doesn't exist!\n", fileNameWithPath); + return NULL; + } + + zfile_t* keys_file = zfile_new (filePath, fileName); + rc = zfile_input (keys_file); + if (rc != 0){ + zfile_destroy(&keys_file); + printf("CRYPTO: Keys file '%s' not readable!\n", fileNameWithPath); + return NULL; + } + + ssize_t keys_file_size = zsys_file_size (fileNameWithPath); + zchunk_t* keys_chunk = zfile_read (keys_file, keys_file_size, 0); + if (keys_chunk == NULL){ + zfile_close(keys_file); + zfile_destroy(&keys_file); + printf("CRYPTO: Can't read file '%s'!\n", fileNameWithPath); + return NULL; + } + + char* keys_data = zchunk_strdup(keys_chunk); + zchunk_destroy(&keys_chunk); + zfile_close(keys_file); + zfile_destroy (&keys_file); + + return keys_data; +} + +static void parse_key_lines(char *keysBuffer, char **key, char **iv){ + char *line = NULL, *saveLinePointer = NULL; + + bool firstTime = true; + do { + if (firstTime){ + line = strtok_r(keysBuffer, "\n", &saveLinePointer); + firstTime = false; + }else { + line = strtok_r(NULL, "\n", &saveLinePointer); + } + + if (line == NULL){ + break; + } + + parse_key_line(line, key, iv); + + } while((*key == NULL || *iv == NULL) && line != NULL); + +} + +static void parse_key_line(char *line, char **key, char **iv){ + char *detectedKey = NULL, *detectedValue= NULL; + + char* sep_at = strchr(line, ':'); + if (sep_at == NULL){ + return; + } + + *sep_at = '\0'; // overwrite first separator, creating two strings. + detectedKey = line; + detectedValue = sep_at + 1; + + if (detectedKey == NULL || detectedValue == NULL){ + return; + } + if (detectedKey[0] == '\0' || detectedValue[0] == '\0'){ + return; + } + + if (*key == NULL && strcmp(detectedKey, KEY_TO_GET) == 0){ + *key = strndup(detectedValue, AES_KEY_LENGTH); + } else if (*iv == NULL && strcmp(detectedKey, IV_TO_GET) == 0){ + *iv = strndup(detectedValue, AES_IV_LENGTH); + } +} + +static void extract_keys_from_buffer(unsigned char *input, int inputlen, char **publicKey, char **secretKey) { + // Load decrypted text buffer + zchunk_t* secret_decrypted = zchunk_new(input, inputlen); + if (secret_decrypted == NULL){ + printf("CRYPTO: Failed to create zchunk\n"); + return; + } + + zconfig_t* secret_config = zconfig_chunk_load (secret_decrypted); + zchunk_destroy (&secret_decrypted); + if (secret_config == NULL){ + printf("CRYPTO: Failed to create zconfig\n"); + return; + } + + // Extract public and secret key from text buffer + char* public_text = zconfig_get (secret_config, "/curve/public-key", NULL); + char* secret_text = zconfig_get (secret_config, "/curve/secret-key", NULL); + + if (public_text == NULL || secret_text == NULL){ + zconfig_destroy(&secret_config); + printf("CRYPTO: Loading public / secret key from text-buffer failed!\n"); + return; + } + + *publicKey = strndup(public_text, ZMQ_KEY_LENGTH + 1); + *secretKey = strndup(secret_text, ZMQ_KEY_LENGTH + 1); + + zconfig_destroy(&secret_config); +} http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_common/public/include/dyn_msg_utils.h ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_common/public/include/dyn_msg_utils.h b/celix-pubsub/pubsub/pubsub_common/public/include/dyn_msg_utils.h new file mode 100644 index 0000000..71085ab --- /dev/null +++ b/celix-pubsub/pubsub/pubsub_common/public/include/dyn_msg_utils.h @@ -0,0 +1,39 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * dyn_msg_utils.h + * + * \date Nov 11, 2015 + * \author Apache Celix Project Team + * \copyright Apache License, Version 2.0 + */ + +#ifndef DYN_MSG_UTILS_H_ +#define DYN_MSG_UTILS_H_ + +#include "bundle.h" +#include "hash_map.h" + +unsigned int uintHash(const void * uintNum); +int uintEquals(const void * uintNum, const void * toCompare); + +void fillMsgTypesMap(hash_map_pt msgTypesMap,bundle_pt bundle); +void emptyMsgTypesMap(hash_map_pt msgTypesMap); + +#endif /* DYN_MSG_UTILS_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_common/public/include/etcd.h ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_common/public/include/etcd.h b/celix-pubsub/pubsub/pubsub_common/public/include/etcd.h new file mode 100644 index 0000000..a502df4 --- /dev/null +++ b/celix-pubsub/pubsub/pubsub_common/public/include/etcd.h @@ -0,0 +1,39 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#ifndef ETCD_H_ +#define ETCD_H_ + +#include + +typedef void (*etcd_key_value_callback) (const char *key, const char *value, void* arg); + +int etcd_init(const char* server, int port); + +int etcd_get(const char* key, char** value, int* modifiedIndex); +int etcd_get_directory(const char* directory, etcd_key_value_callback callback, void *arg, long long* modifiedIndex); + +int etcd_set(const char* key, const char* value, int ttl, bool prevExist); +int etcd_set_with_check(const char* key, const char* value, int ttl, bool always_write); + +int etcd_del(const char* key); + +int etcd_watch(const char* key, long long index, char** action, char** prevValue, char** value, char** rkey, long long* modifiedIndex); + +#endif /* ETCD_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_common/public/include/publisher_endpoint_announce.h ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_common/public/include/publisher_endpoint_announce.h b/celix-pubsub/pubsub/pubsub_common/public/include/publisher_endpoint_announce.h new file mode 100644 index 0000000..bd39fc0 --- /dev/null +++ b/celix-pubsub/pubsub/pubsub_common/public/include/publisher_endpoint_announce.h @@ -0,0 +1,36 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#ifndef PUBLISHER_ENDPOINT_ANNOUNCE_H_ +#define PUBLISHER_ENDPOINT_ANNOUNCE_H_ + +#include "pubsub_endpoint.h" + +struct publisher_endpoint_announce { + void *handle; + celix_status_t (*announcePublisher)(void *handle, pubsub_endpoint_pt pubEP); + celix_status_t (*removePublisher)(void *handle, pubsub_endpoint_pt pubEP); + celix_status_t (*interestedInTopic)(void* handle, const char *scope, const char *topic); + celix_status_t (*uninterestedInTopic)(void* handle, const char *scope, const char *topic); +}; + +typedef struct publisher_endpoint_announce *publisher_endpoint_announce_pt; + + +#endif /* PUBLISHER_ENDPOINT_ANNOUNCE_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_admin.h ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_admin.h b/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_admin.h new file mode 100644 index 0000000..1670942 --- /dev/null +++ b/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_admin.h @@ -0,0 +1,56 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * pubsub_admin.h + * + * \date Sep 30, 2011 + * \author Apache Celix Project Team + * \copyright Apache License, Version 2.0 + */ + +#ifndef PUBSUB_ADMIN_H_ +#define PUBSUB_ADMIN_H_ + +#include "service_reference.h" + +#include "pubsub_common.h" +#include "pubsub_endpoint.h" + +#define PSA_IP "PSA_IP" +#define PSA_ITF "PSA_INTERFACE" +#define PSA_MULTICAST_IP_PREFIX "PSA_MC_PREFIX" + +typedef struct pubsub_admin *pubsub_admin_pt; + +struct pubsub_admin_service { + pubsub_admin_pt admin; + + celix_status_t (*addSubscription)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); + celix_status_t (*removeSubscription)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); + + celix_status_t (*addPublication)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); + celix_status_t (*removePublication)(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); + + celix_status_t (*closeAllPublications)(pubsub_admin_pt admin,char* scope, char* topic); + celix_status_t (*closeAllSubscriptions)(pubsub_admin_pt admin,char* scope, char* topic); +}; + +typedef struct pubsub_admin_service *pubsub_admin_service_pt; + +#endif /* PUBSUB_ADMIN_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_common.h ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_common.h b/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_common.h new file mode 100644 index 0000000..d9c6f1d --- /dev/null +++ b/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_common.h @@ -0,0 +1,51 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * pubsub_common.h + * + * \date Sep 17, 2015 + * \author Apache Celix Project Team + * \copyright Apache License, Version 2.0 + */ + +#ifndef PUBSUB_COMMON_H_ +#define PUBSUB_COMMON_H_ + +#define PUBSUB_ADMIN_SERVICE "pubsub_admin" +#define PUBSUB_DISCOVERY_SERVICE "pubsub_discovery" +#define PUBSUB_TM_ANNOUNCE_PUBLISHER_SERVICE "pubsub_tm_announce_publisher" + +#define PUBSUB_ANY_SUB_TOPIC "any" + +#define PUBSUB_BUNDLE_ID "bundle.id" + +#define MAX_SCOPE_LEN 1024 +#define MAX_TOPIC_LEN 1024 + +struct pubsub_msg_header{ + char topic[MAX_TOPIC_LEN]; + unsigned int type; + unsigned char major; + unsigned char minor; +}; + +typedef struct pubsub_msg_header* pubsub_msg_header_pt; + + +#endif /* PUBSUB_COMMON_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/f9a5fb11/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_endpoint.h ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_endpoint.h b/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_endpoint.h new file mode 100644 index 0000000..ae6bcf8 --- /dev/null +++ b/celix-pubsub/pubsub/pubsub_common/public/include/pubsub_endpoint.h @@ -0,0 +1,49 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * pubsub_endpoint.h + * + * \date Sep 21, 2015 + * \author Apache Celix Project Team + * \copyright Apache License, Version 2.0 + */ + +#ifndef PUBSUB_ENDPOINT_H_ +#define PUBSUB_ENDPOINT_H_ + +#include "service_reference.h" + +struct pubsub_endpoint { + char *frameworkUUID; + char *scope; + char *topic; + long serviceID; + char* endpoint; +}; + +typedef struct pubsub_endpoint *pubsub_endpoint_pt; + +celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,pubsub_endpoint_pt* psEp); +celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference,pubsub_endpoint_pt* psEp); +celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp); +bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2); + +char *createScopeTopicKey(const char* scope, const char* topic); + +#endif /* PUBSUB_ENDPOINT_H_ */