Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5893D200D0C for ; Wed, 20 Sep 2017 17:09:34 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 576371609E1; Wed, 20 Sep 2017 15:09:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3A2091609E2 for ; Wed, 20 Sep 2017 17:09:32 +0200 (CEST) Received: (qmail 48485 invoked by uid 500); 20 Sep 2017 15:09:31 -0000 Mailing-List: contact commits-help@celix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@celix.apache.org Delivered-To: mailing list commits@celix.apache.org Received: (qmail 48471 invoked by uid 99); 20 Sep 2017 15:09:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Sep 2017 15:09:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 11CB7F5697; Wed, 20 Sep 2017 15:09:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gricciardi@apache.org To: commits@celix.apache.org Date: Wed, 20 Sep 2017 15:09:31 -0000 Message-Id: <0c4376c4486b4cfcae8e1bb48a4c0de2@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [4/6] celix git commit: Refactored serializers management archived-at: Wed, 20 Sep 2017 15:09:34 -0000 http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c index bed5dfc..b85f0a9 100644 --- a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c +++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c @@ -7,7 +7,7 @@ *"License"); you may not use this file except in compliance *with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * htPSA_UDP_MC_TP://www.apache.org/licenses/LICENSE-2.0 * *Unless required by applicable law or agreed to in writing, *software distributed under the License is distributed on an @@ -41,11 +41,13 @@ #include "service_factory.h" #include "version.h" -#include "pubsub_publish_service_private.h" +#include "topic_publication.h" #include "pubsub_common.h" #include "publisher.h" #include "large_udp.h" +#include "pubsub_serializer.h" + #define EP_ADDRESS_LEN 32 #define FIRST_SEND_DELAY 2 @@ -57,28 +59,26 @@ struct topic_publication { array_list_pt pub_ep_list; //List hash_map_pt boundServices; // celix_thread_mutex_t tp_lock; + pubsub_serializer_service_t *serializer; struct sockaddr_in destAddr; - pubsub_serializer_service_t* serializerSvc; }; typedef struct publish_bundle_bound_service { topic_publication_pt parent; - pubsub_publisher_t pubSvc; + pubsub_publisher_t service; bundle_pt bundle; - char *scope; + char *scope; char *topic; + hash_map_pt msgTypes; unsigned short getCount; celix_thread_mutex_t mp_lock; - bool mp_send_in_progress; - array_list_pt mp_parts; largeUdp_pt largeUdpHandle; - pubsub_msg_serializer_map_t* map; -} publish_bundle_bound_service_t; +}* publish_bundle_bound_service_pt; typedef struct pubsub_msg{ pubsub_msg_header_pt header; char* payload; - int payloadSize; + size_t payloadSize; } pubsub_msg_t; static unsigned int rand_range(unsigned int min, unsigned int max); @@ -86,10 +86,10 @@ static unsigned int rand_range(unsigned int min, unsigned int max); static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service); static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service); -static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle); -static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc); +static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle); +static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc); -static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg); +static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, const void *msg); static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId); @@ -97,12 +97,12 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsig static void delay_first_send_for_late_joiners(void); -celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, char* bindIP, topic_publication_pt *out){ +celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, topic_publication_pt *out){ - char* ep = malloc(EP_ADDRESS_LEN); - memset(ep,0,EP_ADDRESS_LEN); - unsigned int port = pubEP->serviceID + rand_range(UDP_BASE_PORT+pubEP->serviceID+3, UDP_MAX_PORT); - snprintf(ep,EP_ADDRESS_LEN,"udp://%s:%u",bindIP,port); + char* ep = malloc(EP_ADDRESS_LEN); + memset(ep,0,EP_ADDRESS_LEN); + unsigned int port = pubEP->serviceID + rand_range(UDP_BASE_PORT+pubEP->serviceID+3, UDP_MAX_PORT); + snprintf(ep,EP_ADDRESS_LEN,"udp://%s:%u",bindIP,port); topic_publication_pt pub = calloc(1,sizeof(*pub)); @@ -116,7 +116,8 @@ celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pub->destAddr.sin_family = AF_INET; pub->destAddr.sin_addr.s_addr = inet_addr(bindIP); pub->destAddr.sin_port = htons(port); - pub->serializerSvc = NULL; + + pub->serializer = best_serializer; pubsub_topicPublicationAddPublisherEP(pub,pubEP); @@ -127,6 +128,7 @@ celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){ celix_status_t status = CELIX_SUCCESS; + celixThreadMutex_lock(&(pub->tp_lock)); free(pub->endpoint); @@ -134,14 +136,18 @@ celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){ hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices); while(hashMapIterator_hasNext(iter)){ - publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(iter); + publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter); pubsub_destroyPublishBundleBoundService(bound); } hashMapIterator_destroy(iter); hashMap_destroy(pub->boundServices,false,false); pub->svcFactoryReg = NULL; - status = close(pub->sendSocket); + pub->serializer = NULL; + + if(close(pub->sendSocket) != 0){ + status = CELIX_FILE_IO_EXCEPTION; + } celixThreadMutex_unlock(&(pub->tp_lock)); @@ -156,7 +162,6 @@ celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,top celix_status_t status = CELIX_SUCCESS; /* Let's register the new service */ - //celixThreadMutex_lock(&(pub->tp_lock)); pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0); @@ -167,39 +172,29 @@ celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,top factory->ungetService = pubsub_topicPublicationUngetService; properties_pt props = properties_create(); - properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope); + properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope); properties_set(props,PUBSUB_PUBLISHER_TOPIC,pubEP->topic); status = bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg)); if(status != CELIX_SUCCESS){ properties_destroy(props); - printf("PSA: Cannot register ServiceFactory for topic %s, topic %s (bundle %ld).\n",pubEP->scope, pubEP->topic,pubEP->serviceID); + printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot register ServiceFactory for topic %s, topic %s (bundle %ld).\n",pubEP->scope, pubEP->topic,pubEP->serviceID); } else{ *svcFactory = factory; } } else{ - printf("PSA: Cannot find pubsub_endpoint after adding it...Should never happen!\n"); + printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot find pubsub_endpoint after adding it...Should never happen!\n"); status = CELIX_SERVICE_EXCEPTION; } - //celixThreadMutex_unlock(&(pub->tp_lock)); - return status; } celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){ - celix_status_t status = CELIX_SUCCESS; - - //celixThreadMutex_lock(&(pub->tp_lock)); - - status = serviceRegistration_unregister(pub->svcFactoryReg); - - //celixThreadMutex_unlock(&(pub->tp_lock)); - - return status; + return serviceRegistration_unregister(pub->svcFactoryReg); } celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){ @@ -221,66 +216,12 @@ celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub return CELIX_SUCCESS; } -celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc){ - celix_status_t status = CELIX_SUCCESS; - +array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){ + array_list_pt list = NULL; celixThreadMutex_lock(&(pub->tp_lock)); - - //clear old serializer - if (pub->serializerSvc != NULL) { - hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); //key = bundle_pt, publish_bundle_bound_service_t* - while (hashMapIterator_hasNext(&iter)) { - publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter); - celixThreadMutex_lock(&bound->mp_lock); - pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map); - celixThreadMutex_unlock(&bound->mp_lock); - bound->map = NULL; - } - } - - //setup new serializer - pub->serializerSvc = serializerSvc; - if (pub->serializerSvc != NULL) { - hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); - while (hashMapIterator_hasNext(&iter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter); - bundle_pt bundle = hashMapEntry_getKey(entry); - publish_bundle_bound_service_t *bound = hashMapEntry_getValue(entry); - celixThreadMutex_lock(&bound->mp_lock); - pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, &bound->map); - celixThreadMutex_unlock(&bound->mp_lock); - } - } - + list = arrayList_clone(pub->pub_ep_list); celixThreadMutex_unlock(&(pub->tp_lock)); - - return status; -} - -celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* svc){ - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&(pub->tp_lock)); - if (pub->serializerSvc == svc) { - hash_map_iterator_t iter = hashMapIterator_construct(pub->boundServices); - while (hashMapIterator_hasNext(&iter)) { - publish_bundle_bound_service_t* bound = hashMapIterator_nextValue(&iter); - celixThreadMutex_lock(&bound->mp_lock); - pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, bound->map); - celixThreadMutex_unlock(&bound->mp_lock); - bound->map = NULL; - } - - pub->serializerSvc = NULL; - } - - celixThreadMutex_unlock(&(pub->tp_lock)); - - return status; -} - -array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){ - return pub->pub_ep_list; + return list; } @@ -291,19 +232,19 @@ static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt celixThreadMutex_lock(&(publish->tp_lock)); - publish_bundle_bound_service_t* bound = hashMap_get(publish->boundServices, bundle); - if (bound == NULL) { - bound = pubsub_createPublishBundleBoundService(publish, bundle); - if (bound != NULL) { - hashMap_put(publish->boundServices, bundle, bound); + publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle); + if(bound==NULL){ + bound = pubsub_createPublishBundleBoundService(publish,bundle); + if(bound!=NULL){ + hashMap_put(publish->boundServices,bundle,bound); } } - else { + else{ bound->getCount++; } if (bound != NULL) { - *service = &bound->pubSvc; + *service = &bound->service; } celixThreadMutex_unlock(&(publish->tp_lock)); @@ -317,19 +258,20 @@ static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_p celixThreadMutex_lock(&(publish->tp_lock)); - publish_bundle_bound_service_t* bound = hashMap_get(publish->boundServices, bundle); - if (bound != NULL) { + publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle); + if(bound!=NULL){ bound->getCount--; - if (bound->getCount == 0) { + if(bound->getCount==0){ pubsub_destroyPublishBundleBoundService(bound); hashMap_remove(publish->boundServices,bundle); } + } - else { + else{ long bundleId = -1; bundle_getBundleId(bundle,&bundleId); - printf("TP: Unexpected ungetService call for bundle %ld.\n", bundleId); + printf("PSA_UDP_MC_TP: Unexpected ungetService call for bundle %ld.\n", bundleId); } /* service should be never used for unget, so let's set the pointer to NULL */ @@ -340,7 +282,7 @@ static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_p return CELIX_SUCCESS; } -static bool send_pubsub_msg(publish_bundle_bound_service_t* bound, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback){ +static bool send_pubsub_msg(publish_bundle_bound_service_pt bound, pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback){ const int iovec_len = 3; // header + size + payload bool ret = true; @@ -357,50 +299,36 @@ static bool send_pubsub_msg(publish_bundle_bound_service_t* bound, pubsub_msg_t* delay_first_send_for_late_joiners(); if(largeUdp_sendmsg(bound->largeUdpHandle, bound->parent->sendSocket, msg_iovec, iovec_len, 0, &bound->parent->destAddr, sizeof(bound->parent->destAddr)) == -1) { - fprintf(stderr, "Socket: %d, size: %i",bound->parent->sendSocket, compiledMsgSize); - perror("send_pubsub_msg:sendSocket"); - ret = false; + fprintf(stderr, "Socket: %d, size: %i",bound->parent->sendSocket, compiledMsgSize); + perror("send_pubsub_msg:sendSocket"); + ret = false; } if(releaseCallback) { - releaseCallback->release(msg->payload, bound); + releaseCallback->release(msg->payload, bound); } return ret; -} +} -static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg) { - int status = 0; - publish_bundle_bound_service_t* bound = handle; - celixThreadMutex_lock(&(bound->parent->tp_lock)); - celixThreadMutex_lock(&(bound->mp_lock)); +static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg) { + int status = 0; + publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle; - pubsub_msg_serializer_t *msgSer = NULL; - if (bound->map != NULL) { - msgSer = hashMap_get(bound->map->serializers, (void *)(uintptr_t)msgTypeId); - } + celixThreadMutex_lock(&(bound->parent->tp_lock)); + celixThreadMutex_lock(&(bound->mp_lock)); - if (bound->map == NULL) { - printf("TP: Serializer is not set!\n"); - status = 1; - } else if (msgSer == NULL ){ - printf("TP: No msg serializer available for msg type id %d\n", msgTypeId); - hash_map_iterator_t iter = hashMapIterator_construct(bound->map->serializers); - printf("Note supported messages:\n"); - while (hashMapIterator_hasNext(&iter)) { - pubsub_msg_serializer_t *msgSer = hashMapIterator_nextValue(&iter); - printf("\tmsg %s with id %d\n", msgSer->msgName, msgSer->msgId); - } - status = 1; - } + pubsub_msg_serializer_t* msgSer = (pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, (void*)(uintptr_t)msgTypeId); - int major=0, minor=0; + if (msgSer != NULL) { + int major=0, minor=0; - if (status == 0 && msgSer != NULL) { pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header)); strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1); msg_hdr->type = msgTypeId; + + if (msgSer->msgVersion != NULL){ version_getMajor(msgSer->msgVersion, &major); version_getMinor(msgSer->msgVersion, &minor); @@ -408,15 +336,16 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, con msg_hdr->minor = minor; } - char* serializedOutput = NULL; + void* serializedOutput = NULL; size_t serializedOutputLen = 0; - msgSer->serialize(msgSer->handle, msg, &serializedOutput, &serializedOutputLen); + msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen); - pubsub_msg_t* msg = calloc(1,sizeof(struct pubsub_msg)); + pubsub_msg_t *msg = calloc(1,sizeof(pubsub_msg_t)); msg->header = msg_hdr; - msg->payload = serializedOutput; + msg->payload = (char *)serializedOutput; msg->payloadSize = serializedOutputLen; + if(send_pubsub_msg(bound, msg,true, NULL) == false) { status = -1; } @@ -424,38 +353,21 @@ static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, con free(msg); free(serializedOutput); - } - celixThreadMutex_unlock(&(bound->mp_lock)); + } else { + printf("PSA_UDP_MC_TP: No msg serializer available for msg type id %d\n", msgTypeId); + status=-1; + } + + celixThreadMutex_unlock(&(bound->mp_lock)); celixThreadMutex_unlock(&(bound->parent->tp_lock)); - return status; + return status; } -static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* out){ - publish_bundle_bound_service_t* bound = handle; - unsigned int msgTypeId = 0; - - celixThreadMutex_lock(&bound->mp_lock); - if (bound->map != NULL) { - hash_map_iterator_t iter = hashMapIterator_construct(bound->map->serializers); - while (hashMapIterator_hasNext(&iter)) { - pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter); - if (strncmp(msgType, msgSer->msgName, 1024*1024) == 0) { - msgTypeId = msgSer->msgId; - break; - } - } - } - celixThreadMutex_unlock(&bound->mp_lock); - - if (msgTypeId != 0) { - *out = msgTypeId; - return 0; - } else { - printf("TP: Cannot find msg type id for msg type %s\n", msgType); - return 1; - } +static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){ + *msgTypeId = utils_stringHash(msgType); + return 0; } @@ -466,58 +378,49 @@ static unsigned int rand_range(unsigned int min, unsigned int max){ } -static publish_bundle_bound_service_t* pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle) { - //PRECOND lock on publish->tp_lock - publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound)); +static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){ + + publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound)); if (bound != NULL) { + bound->parent = tp; bound->bundle = bundle; bound->getCount = 1; - bound->mp_send_in_progress = false; celixThreadMutex_create(&bound->mp_lock,NULL); - arrayList_create(&bound->mp_parts); + + if(tp->serializer != NULL){ + tp->serializer->createSerializerMap(tp->serializer->handle,bundle,&bound->msgTypes); + } pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0); bound->scope=strdup(pubEP->scope); bound->topic=strdup(pubEP->topic); bound->largeUdpHandle = largeUdp_create(1); - bound->pubSvc.handle = bound; - bound->pubSvc.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID; - bound->pubSvc.send = pubsub_topicPublicationSend; - bound->pubSvc.sendMultipart = NULL; //Multipart not supported (jet) for UDP - if (tp->serializerSvc != NULL) { - tp->serializerSvc->createSerializerMap(tp->serializerSvc->handle, bundle, &bound->map); - } - } - else - { - free(bound); - return NULL; + bound->service.handle = bound; + bound->service.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID; + bound->service.send = pubsub_topicPublicationSend; + bound->service.sendMultipart = NULL; //Multipart not supported for UDP + } return bound; } -static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* boundSvc) { - //PRECOND lock on publish->tp_lock - celixThreadMutex_lock(&boundSvc->mp_lock); +static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){ - if (boundSvc->map != NULL && boundSvc->parent->serializerSvc != NULL) { - boundSvc->parent->serializerSvc->destroySerializerMap(boundSvc->parent->serializerSvc->handle, boundSvc->map); - boundSvc->map = NULL; - } + celixThreadMutex_lock(&boundSvc->mp_lock); - if (boundSvc->mp_parts!=NULL) { - arrayList_destroy(boundSvc->mp_parts); + if(boundSvc->parent->serializer != NULL && boundSvc->msgTypes != NULL){ + boundSvc->parent->serializer->destroySerializerMap(boundSvc->parent->serializer->handle, boundSvc->msgTypes); } - if (boundSvc->scope!=NULL) { - free(boundSvc->scope); - } + if(boundSvc->scope!=NULL){ + free(boundSvc->scope); + } - if (boundSvc->topic!=NULL) { + if(boundSvc->topic!=NULL){ free(boundSvc->topic); } @@ -535,7 +438,7 @@ static void delay_first_send_for_late_joiners(){ static bool firstSend = true; if(firstSend){ - printf("TP: Delaying first send for late joiners...\n"); + printf("PSA_UDP_MC_TP: Delaying first send for late joiners...\n"); sleep(FIRST_SEND_DELAY); firstSend = false; } http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c index 7a3f5a9..5896264 100644 --- a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c +++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c @@ -29,35 +29,31 @@ #include #include +#include #include +#include #include #include -#if defined(__APPLE__) && defined(__MACH__) - #include - #include -#else - #include -#endif - #include "utils.h" #include "celix_errno.h" #include "constants.h" #include "version.h" #include "topic_subscription.h" +#include "topic_publication.h" #include "subscriber.h" #include "publisher.h" -#include "pubsub_publish_service_private.h" #include "large_udp.h" +#include "pubsub_serializer.h" + #define MAX_EPOLL_EVENTS 10 #define RECV_THREAD_TIMEOUT 5 #define UDP_BUFFER_SIZE 65535 #define MAX_UDP_SESSIONS 16 -struct topic_subscription { - +struct topic_subscription{ char* ifIpAddress; service_tracker_pt tracker; array_list_pt sub_ep_list; @@ -65,25 +61,24 @@ struct topic_subscription { bool running; celix_thread_mutex_t ts_lock; bundle_context_pt context; - int topicEpollFd; // EPOLL filedescriptor where the sockets are registered. - //NOTE. using a service ptr can be dangerous, because pointer can be reused. - //ensuring that pointer are removed before new (refurbish) pionter comes along is crucial! - hash_map_pt msgSerializerMapMap; // key = service ptr, value = pubsub_msg_serializer_map_t* - hash_map_pt bundleMap; //key = service ptr, value = bundle_pt + pubsub_serializer_service_t *serializer; + int topicEpollFd; // EPOLL filedescriptor where the sockets are registered. + hash_map_pt servicesMap; // key = service, value = msg types map hash_map_pt socketMap; // key = URL, value = listen-socket + + celix_thread_mutex_t pendingConnections_lock; + array_list_pt pendingConnections; + + array_list_pt pendingDisconnections; + celix_thread_mutex_t pendingDisconnections_lock; + + //array_list_pt rawServices; unsigned int nrSubscribers; largeUdp_pt largeUdpHandle; - pubsub_serializer_service_t* serializerSvc; - }; -typedef struct mp_handle{ - hash_map_pt svc_msg_db; - hash_map_pt rcv_msg_map; -}* mp_handle_pt; - typedef struct msg_map_entry{ bool retain; void* msgInst; @@ -95,9 +90,11 @@ static void* udp_recv_thread_func(void* arg); static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr); static void sigusr1_sighandler(int signo); static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId); +static void connectPendingPublishers(topic_subscription_pt sub); +static void disconnectPendingPublishers(topic_subscription_pt sub); -celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, pubsub_serializer_service_t* serializer, char* scope, char* topic,topic_subscription_pt* out){ +celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, char* ifIp,char* scope, char* topic ,pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out){ celix_status_t status = CELIX_SUCCESS; topic_subscription_pt ts = (topic_subscription_pt) calloc(1,sizeof(*ts)); @@ -109,35 +106,39 @@ celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundl ts->topicEpollFd = epoll_create1(0); #endif if(ts->topicEpollFd == -1) { - status += CELIX_SERVICE_EXCEPTION; + status += CELIX_SERVICE_EXCEPTION; } ts->running = false; ts->nrSubscribers = 0; - ts->serializerSvc = NULL; + ts->serializer = best_serializer; celixThreadMutex_create(&ts->ts_lock,NULL); arrayList_create(&ts->sub_ep_list); - ts->msgSerializerMapMap = hashMap_create(NULL, NULL, NULL, NULL); - ts->bundleMap = hashMap_create(NULL, NULL, NULL, NULL); + ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL); ts->socketMap = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + arrayList_create(&ts->pendingConnections); + arrayList_create(&ts->pendingDisconnections); + celixThreadMutex_create(&ts->pendingConnections_lock, NULL); + celixThreadMutex_create(&ts->pendingDisconnections_lock, NULL); + ts->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS); char filter[128]; memset(filter,0,128); if(strncmp(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, scope, strlen(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT)) == 0) { - // default scope, means that subscriber has not defined a scope property - snprintf(filter, 128, "(&(%s=%s)(%s=%s))", - (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME, - PUBSUB_SUBSCRIBER_TOPIC,topic); - - } else { - snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))", - (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME, - PUBSUB_SUBSCRIBER_TOPIC,topic, - PUBSUB_SUBSCRIBER_SCOPE,scope); - } + // default scope, means that subscriber has not defined a scope property + snprintf(filter, 128, "(&(%s=%s)(%s=%s))", + (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME, + PUBSUB_SUBSCRIBER_TOPIC,topic); + + } else { + snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))", + (char*) OSGI_FRAMEWORK_OBJECTCLASS, PUBSUB_SUBSCRIBER_SERVICE_NAME, + PUBSUB_SUBSCRIBER_TOPIC,topic, + PUBSUB_SUBSCRIBER_SCOPE,scope); + } service_tracker_customizer_pt customizer = NULL; status += serviceTrackerCustomizer_create(ts,NULL,topicsub_subscriberTracked,NULL,topicsub_subscriberUntracked,&customizer); @@ -151,10 +152,9 @@ celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundl sigaction(SIGUSR1,&actions,NULL); - if (status == CELIX_SUCCESS) { - *out=ts; - pubsub_topicSubscriptionSetSerializer(ts, serializer); - } + if (status == CELIX_SUCCESS) { + *out=ts; + } return status; } @@ -168,10 +168,20 @@ celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){ serviceTracker_destroy(ts->tracker); arrayList_clear(ts->sub_ep_list); arrayList_destroy(ts->sub_ep_list); - hashMap_destroy(ts->msgSerializerMapMap, false, false); - hashMap_destroy(ts->bundleMap, false, false); + hashMap_destroy(ts->servicesMap,false,false); + + hashMap_destroy(ts->socketMap,true,true); + + celixThreadMutex_lock(&ts->pendingConnections_lock); + arrayList_destroy(ts->pendingConnections); + celixThreadMutex_unlock(&ts->pendingConnections_lock); + celixThreadMutex_destroy(&ts->pendingConnections_lock); + + celixThreadMutex_lock(&ts->pendingDisconnections_lock); + arrayList_destroy(ts->pendingDisconnections); + celixThreadMutex_unlock(&ts->pendingDisconnections_lock); + celixThreadMutex_destroy(&ts->pendingDisconnections_lock); - hashMap_destroy(ts->socketMap,false,false); largeUdp_destroy(ts->largeUdpHandle); #if defined(__APPLE__) && defined(__MACH__) //TODO: Use kqueue for OSX @@ -211,15 +221,16 @@ celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){ celixThread_join(ts->recv_thread,NULL); - status = serviceTracker_close(ts->tracker); + status = serviceTracker_close(ts->tracker); - hash_map_iterator_pt it = hashMapIterator_create(ts->socketMap); - while(hashMapIterator_hasNext(it)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(it); - char *url = hashMapEntry_getKey(entry); - pubsub_topicSubscriptionDisconnectPublisher(ts, url); - } - hashMapIterator_destroy(it); + hash_map_iterator_pt it = hashMapIterator_create(ts->socketMap); + while(hashMapIterator_hasNext(it)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(it); + char *url = hashMapEntry_getKey(entry); + pubsub_topicSubscriptionDisconnectPublisher(ts, url); + free(url); + } + hashMapIterator_destroy(it); return status; @@ -227,108 +238,126 @@ celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){ celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL) { - printf("pubsub_topicSubscriptionConnectPublisher : pubURL = %s\n", pubURL); + printf("pubsub_topicSubscriptionConnectPublisher : pubURL = %s\n", pubURL); - celix_status_t status = CELIX_SUCCESS; + celix_status_t status = CELIX_SUCCESS; + celixThreadMutex_lock(&ts->ts_lock); - if (!hashMap_containsKey(ts->socketMap, pubURL)){ + if(hashMap_containsKey(ts->socketMap, pubURL)){ + printf("PSA_UDM_MC_TS: PubURL %s already existing!\n",pubURL); + celixThreadMutex_unlock(&ts->ts_lock); + return CELIX_SERVICE_EXCEPTION; + } - celixThreadMutex_lock(&ts->ts_lock); + int *recvSocket = calloc(sizeof(int), 1); + *recvSocket = socket(AF_INET, SOCK_DGRAM, 0); + if (*recvSocket < 0) { + perror("pubsub_topicSubscriptionCreate:socket"); + status = CELIX_SERVICE_EXCEPTION; + } - int *recvSocket = calloc(sizeof(int), 1); - *recvSocket = socket(AF_INET, SOCK_DGRAM, 0); - if (*recvSocket < 0) { - perror("pubsub_topicSubscriptionCreate:socket"); + if (status == CELIX_SUCCESS){ + int reuse = 1; + if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse)) != 0) { + perror("setsockopt() SO_REUSEADDR"); + status = CELIX_SERVICE_EXCEPTION; + } + } + + if(status == CELIX_SUCCESS){ + // TODO Check if there is a better way to parse the URL to IP/Portnr + //replace ':' by spaces + char *url = strdup(pubURL); + char *pt = url; + while((pt=strchr(pt, ':')) != NULL) { + *pt = ' '; + } + char mcIp[100]; + unsigned short mcPort; + sscanf(url, "udp //%s %hu", mcIp, &mcPort); + free (url); + + printf("pubsub_topicSubscriptionConnectPublisher : IP = %s, Port = %hu\n", mcIp, mcPort); + + struct ip_mreq mc_addr; + mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp); + mc_addr.imr_interface.s_addr = inet_addr(ts->ifIpAddress); + printf("Adding MC %s at interface %s\n", mcIp, ts->ifIpAddress); + if (setsockopt(*recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mc_addr, sizeof(mc_addr)) != 0) { + perror("setsockopt() IP_ADD_MEMBERSHIP"); status = CELIX_SERVICE_EXCEPTION; } if (status == CELIX_SUCCESS){ - int reuse = 1; - if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, sizeof(reuse)) != 0) { - perror("setsockopt() SO_REUSEADDR"); + struct sockaddr_in mcListenAddr; + mcListenAddr.sin_family = AF_INET; + mcListenAddr.sin_addr.s_addr = INADDR_ANY; + mcListenAddr.sin_port = htons(mcPort); + if(bind(*recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr)) != 0) { + perror("bind()"); status = CELIX_SERVICE_EXCEPTION; } } if (status == CELIX_SUCCESS){ - // TODO Check if there is a better way to parse the URL to IP/Portnr - //replace ':' by spaces - char *url = strdup(pubURL); - char *pt = url; - while((pt=strchr(pt, ':')) != NULL) { - *pt = ' '; - } - char mcIp[100]; - unsigned short mcPort; - sscanf(url, "udp //%s %hu", mcIp, &mcPort); - free (url); - - printf("pubsub_topicSubscriptionConnectPublisher : IP = %s, Port = %hu\n", mcIp, mcPort); - - struct ip_mreq mc_addr; - mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp); - mc_addr.imr_interface.s_addr = inet_addr(ts->ifIpAddress); - printf("Adding MC %s at interface %s\n", mcIp, ts->ifIpAddress); - if (setsockopt(*recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mc_addr, sizeof(mc_addr)) != 0) { - perror("setsockopt() IP_ADD_MEMBERSHIP"); +#if defined(__APPLE__) && defined(__MACH__) + //TODO: Use kqueue for OSX +#else + struct epoll_event ev; + memset(&ev, 0, sizeof(ev)); + ev.events = EPOLLIN; + ev.data.fd = *recvSocket; + if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, *recvSocket, &ev) == -1) { + perror("epoll_ctl() EPOLL_CTL_ADD"); status = CELIX_SERVICE_EXCEPTION; } +#endif + } - if (status == CELIX_SUCCESS){ - struct sockaddr_in mcListenAddr; - mcListenAddr.sin_family = AF_INET; - mcListenAddr.sin_addr.s_addr = INADDR_ANY; - mcListenAddr.sin_port = htons(mcPort); - if(bind(*recvSocket, (struct sockaddr*)&mcListenAddr, sizeof(mcListenAddr)) != 0) { - perror("bind()"); - status = CELIX_SERVICE_EXCEPTION; - } - } - - if (status == CELIX_SUCCESS){ - #if defined(__APPLE__) && defined(__MACH__) - //TODO: Use kqueue for OSX - #else - struct epoll_event ev; - memset(&ev, 0, sizeof(ev)); - ev.events = EPOLLIN; - ev.data.fd = *recvSocket; - if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, *recvSocket, &ev) == -1) { - perror("epoll_ctl() EPOLL_CTL_ADD"); - status = CELIX_SERVICE_EXCEPTION; - } - #endif - } + } - } + if (status == CELIX_SUCCESS){ + hashMap_put(ts->socketMap, strdup(pubURL), (void*)recvSocket); + }else{ + free(recvSocket); + } - if (status == CELIX_SUCCESS){ - hashMap_put(ts->socketMap, pubURL, (void*)recvSocket); - }else{ - free(recvSocket); - } + celixThreadMutex_unlock(&ts->ts_lock); - celixThreadMutex_unlock(&ts->ts_lock); + return status; +} - } +celix_status_t pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) { + celix_status_t status = CELIX_SUCCESS; + char *url = strdup(pubURL); + celixThreadMutex_lock(&ts->pendingConnections_lock); + arrayList_add(ts->pendingConnections, url); + celixThreadMutex_unlock(&ts->pendingConnections_lock); + return status; +} - return status; +celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL) { + celix_status_t status = CELIX_SUCCESS; + char *url = strdup(pubURL); + celixThreadMutex_lock(&ts->pendingDisconnections_lock); + arrayList_add(ts->pendingDisconnections, url); + celixThreadMutex_unlock(&ts->pendingDisconnections_lock); + return status; } celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL){ - printf("pubsub_topicSubscriptionDisconnectPublisher : pubURL = %s\n", pubURL); - celix_status_t status = CELIX_SUCCESS; + printf("pubsub_topicSubscriptionDisconnectPublisher : pubURL = %s\n", pubURL); + celix_status_t status = CELIX_SUCCESS; + struct epoll_event ev; + memset(&ev, 0, sizeof(ev)); - if (hashMap_containsKey(ts->socketMap, pubURL)){ + celixThreadMutex_lock(&ts->ts_lock); + + if (hashMap_containsKey(ts->socketMap, pubURL)){ #if defined(__APPLE__) && defined(__MACH__) - //TODO: Use kqueue for OSX + //TODO: Use kqueue for OSX #else - struct epoll_event ev; - memset(&ev, 0, sizeof(ev)); - - celixThreadMutex_lock(&ts->ts_lock); - int *s = hashMap_remove(ts->socketMap, pubURL); if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_DEL, *s, &ev) == -1) { printf("in if error()\n"); @@ -336,11 +365,11 @@ celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt status = CELIX_SERVICE_EXCEPTION; } free(s); - - celixThreadMutex_unlock(&ts->ts_lock); #endif - } + } + + celixThreadMutex_unlock(&ts->ts_lock); return status; } @@ -349,9 +378,7 @@ celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, p celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&ts->ts_lock); - arrayList_add(ts->sub_ep_list,subEP); - celixThreadMutex_unlock(&ts->ts_lock); return status; @@ -362,9 +389,7 @@ celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt ts) { celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&ts->ts_lock); - ts->nrSubscribers++; - celixThreadMutex_unlock(&ts->ts_lock); return status; @@ -374,22 +399,17 @@ celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&ts->ts_lock); - arrayList_removeElement(ts->sub_ep_list,subEP); - celixThreadMutex_unlock(&ts->ts_lock); return status; - } celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt ts) { celix_status_t status = CELIX_SUCCESS; celixThreadMutex_lock(&ts->ts_lock); - ts->nrSubscribers--; - celixThreadMutex_unlock(&ts->ts_lock); return status; @@ -399,153 +419,118 @@ unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) { return ts->nrSubscribers; } -celix_status_t pubsub_topicSubscriptionSetSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc) { - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&ts->ts_lock); - //clear old - if (ts->serializerSvc != NULL) { - hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap); - while (hashMapIterator_hasNext(&iter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter); - pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry); - pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry); - ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map); - hashMap_put(ts->msgSerializerMapMap, subsvc, NULL); - - } - } - ts->serializerSvc = serializerSvc; - //init new - if (ts->serializerSvc != NULL) { - hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap); - while (hashMapIterator_hasNext(&iter)) { - pubsub_subscriber_t* subsvc = hashMapIterator_nextKey(&iter); - bundle_pt bundle = hashMap_get(ts->bundleMap, subsvc); - pubsub_msg_serializer_map_t* map = NULL; - ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map); - hashMap_put(ts->msgSerializerMapMap, subsvc, map); - } - } - celixThreadMutex_unlock(&ts->ts_lock); - - return status; +array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt sub){ + return sub->sub_ep_list; } -celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc){ - celix_status_t status = CELIX_SUCCESS; - - celixThreadMutex_lock(&ts->ts_lock); - if (ts->serializerSvc == serializerSvc) { //only act if svc removed is services used - hash_map_iterator_t iter = hashMapIterator_construct(ts->msgSerializerMapMap); - while (hashMapIterator_hasNext(&iter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter); - pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry); - pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry); - ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map); - hashMap_put(ts->msgSerializerMapMap, subsvc, NULL); - } - ts->serializerSvc = NULL; - } - celixThreadMutex_unlock(&ts->ts_lock); - - return status; -} -static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void* svc){ +static celix_status_t topicsub_subscriberTracked(void * handle, service_reference_pt reference, void * service){ celix_status_t status = CELIX_SUCCESS; topic_subscription_pt ts = handle; celixThreadMutex_lock(&ts->ts_lock); - if (!hashMap_containsKey(ts->msgSerializerMapMap, svc)) { + if (!hashMap_containsKey(ts->servicesMap, service)) { bundle_pt bundle = NULL; + hash_map_pt msgTypes = NULL; + serviceReference_getBundle(reference, &bundle); - if (ts->serializerSvc != NULL) { - pubsub_msg_serializer_map_t* map = NULL; - ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map); - if (map != NULL) { - hashMap_put(ts->msgSerializerMapMap, svc, map); - hashMap_put(ts->bundleMap, svc, bundle); - } + if(ts->serializer != NULL && bundle!=NULL){ + ts->serializer->createSerializerMap(ts->serializer->handle,bundle,&msgTypes); + if(msgTypes != NULL){ + hashMap_put(ts->servicesMap, service, msgTypes); + printf("PSA_UDP_MC_TS: New subscriber registered.\n"); + } + } + else{ + printf("PSA_UDP_MC_TS: Cannot register new subscriber.\n"); + status = CELIX_SERVICE_EXCEPTION; } } celixThreadMutex_unlock(&ts->ts_lock); - printf("TS: New subscriber registered.\n"); + return status; } -static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void* svc){ +static celix_status_t topicsub_subscriberUntracked(void * handle, service_reference_pt reference, void * service){ celix_status_t status = CELIX_SUCCESS; topic_subscription_pt ts = handle; - - celixThreadMutex_lock(&ts->ts_lock); - if (hashMap_containsKey(ts->msgSerializerMapMap, svc)) { - pubsub_msg_serializer_map_t* map = hashMap_remove(ts->msgSerializerMapMap, svc); - if (ts->serializerSvc != NULL){ - ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map); - hashMap_remove(ts->bundleMap, svc); - hashMap_remove(ts->msgSerializerMapMap, svc); + celixThreadMutex_lock(&ts->ts_lock); + if (hashMap_containsKey(ts->servicesMap, service)) { + hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service); + if(msgTypes!=NULL && ts->serializer!=NULL){ + ts->serializer->destroySerializerMap(ts->serializer->handle,msgTypes); + printf("PSA_ZMQ_TS: Subscriber unregistered.\n"); + } + else{ + printf("PSA_ZMQ_TS: Cannot unregister subscriber.\n"); + status = CELIX_SERVICE_EXCEPTION; } } celixThreadMutex_unlock(&ts->ts_lock); - printf("TS: Subscriber unregistered.\n"); + printf("PSA_UDP_MC_TS: Subscriber unregistered.\n"); return status; } -static void process_msg(topic_subscription_pt sub, pubsub_udp_msg_t* msg){ +static void process_msg(topic_subscription_pt sub,pubsub_udp_msg_t *msg){ - hash_map_iterator_t iter = hashMapIterator_construct(sub->msgSerializerMapMap); celixThreadMutex_lock(&sub->ts_lock); - while (hashMapIterator_hasNext(&iter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter); + hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap); + while (hashMapIterator_hasNext(iter)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry); - pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry); - - pubsub_msg_serializer_t* msgSer = hashMap_get(map->serializers, (void *)(uintptr_t )msg->header.type); + hash_map_pt msgTypes = hashMapEntry_getValue(entry); + pubsub_msg_serializer_t *msgSer = hashMap_get(msgTypes,(void*)(uintptr_t )msg->header.type); if (msgSer == NULL) { - printf("TS: Primary message %d not supported. NOT receiving any part of the whole message.\n",msg->header.type); - } else { + printf("PSA_UDP_MC_TS: Serializer not available for message %d.\n",msg->header.type); + } + else{ void *msgInst = NULL; - bool validVersion = checkVersion(msgSer->msgVersion, &msg->header); + bool validVersion = checkVersion(msgSer->msgVersion,&msg->header); + if(validVersion){ - celix_status_t status = msgSer->deserialize(msgSer->handle, msg->payload, 0, &msgInst); + + celix_status_t status = msgSer->deserialize(msgSer, (const void *) msg->payload, 0, &msgInst); + if (status == CELIX_SUCCESS) { bool release = true; pubsub_multipart_callbacks_t mp_callbacks; - mp_callbacks.handle = map; + mp_callbacks.handle = sub; mp_callbacks.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForMsgType; mp_callbacks.getMultipart = NULL; subsvc->receive(subsvc->handle, msgSer->msgName, msg->header.type, msgInst, &mp_callbacks, &release); - if (release) { - msgSer->freeMsg(msgSer->handle, msgInst); + + if(release){ + msgSer->freeMsg(msgSer,msgInst); } } else{ - printf("TS: Cannot deserialize msgType %s.\n", msgSer->msgName); + printf("PSA_UDP_MC_TS: Cannot deserialize msgType %s.\n",msgSer->msgName); } } - else { + else{ int major=0,minor=0; - version_getMajor(msgSer->msgVersion, &major); - version_getMinor(msgSer->msgVersion, &minor); - printf("TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n", - msgSer->msgName, major, minor, msg->header.major, msg->header.minor); + version_getMajor(msgSer->msgVersion,&major); + version_getMinor(msgSer->msgVersion,&minor); + printf("PSA_UDP_MC_TS: Version mismatch for primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole message.\n", + msgSer->msgName,major,minor,msg->header.major,msg->header.minor); } + } } + hashMapIterator_destroy(iter); celixThreadMutex_unlock(&sub->ts_lock); } static void* udp_recv_thread_func(void * arg) { - topic_subscription_pt sub = (topic_subscription_pt) arg; + topic_subscription_pt sub = (topic_subscription_pt) arg; #if defined(__APPLE__) && defined(__MACH__) //TODO: use kqueue for OSX @@ -558,52 +543,68 @@ static void* udp_recv_thread_func(void * arg) { } } #else + struct epoll_event events[MAX_EPOLL_EVENTS]; + + while (sub->running) { + int nfds = epoll_wait(sub->topicEpollFd, events, MAX_EPOLL_EVENTS, RECV_THREAD_TIMEOUT * 1000); + int i; + for(i = 0; i < nfds; i++ ) { + unsigned int index; + unsigned int size; + if(largeUdp_dataAvailable(sub->largeUdpHandle, events[i].data.fd, &index, &size) == true) { + // Handle data + pubsub_udp_msg_t *udpMsg = NULL; + if(largeUdp_read(sub->largeUdpHandle, index, (void**)&udpMsg, size) != 0) { + printf("PSA_UDP_MC_TS: ERROR largeUdp_read with index %d\n", index); + continue; + } - struct epoll_event events[MAX_EPOLL_EVENTS]; + process_msg(sub, udpMsg); - while (sub->running) { - int nfds = epoll_wait(sub->topicEpollFd, events, MAX_EPOLL_EVENTS, RECV_THREAD_TIMEOUT * 1000); - int i; - for(i = 0; i < nfds; i++ ) { - unsigned int index; - unsigned int size; - if(largeUdp_dataAvailable(sub->largeUdpHandle, events[i].data.fd, &index, &size) == true) { - // Handle data - pubsub_udp_msg_t* udpMsg = NULL; - if(largeUdp_read(sub->largeUdpHandle, index, (void**)&udpMsg, size) != 0) { - printf("TS: ERROR largeUdp_read with index %d\n", index); - continue; - } - - if (udpMsg->header.type == 0){ - //Raw msg, since raw messages are not supported, don't do anything. - }else{ - process_msg(sub, udpMsg); - } - - free(udpMsg); - } - } - } + free(udpMsg); + } + } + connectPendingPublishers(sub); + disconnectPendingPublishers(sub); + } #endif - return NULL; + return NULL; } +static void connectPendingPublishers(topic_subscription_pt sub) { + celixThreadMutex_lock(&sub->pendingConnections_lock); + while(!arrayList_isEmpty(sub->pendingConnections)) { + char * pubEP = arrayList_remove(sub->pendingConnections, 0); + pubsub_topicSubscriptionConnectPublisher(sub, pubEP); + free(pubEP); + } + celixThreadMutex_unlock(&sub->pendingConnections_lock); +} -static void sigusr1_sighandler(int signo) { - printf("TS: Topic subscription being shut down...\n"); +static void disconnectPendingPublishers(topic_subscription_pt sub) { + celixThreadMutex_lock(&sub->pendingDisconnections_lock); + while(!arrayList_isEmpty(sub->pendingDisconnections)) { + char * pubEP = arrayList_remove(sub->pendingDisconnections, 0); + pubsub_topicSubscriptionDisconnectPublisher(sub, pubEP); + free(pubEP); + } + celixThreadMutex_unlock(&sub->pendingDisconnections_lock); +} + +static void sigusr1_sighandler(int signo){ + printf("PSA_UDP_MC_TS: Topic subscription being shut down...\n"); return; } -static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr) { +static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){ bool check=false; int major=0,minor=0; - if (msgVersion!=NULL) { + if(msgVersion!=NULL){ version_getMajor(msgVersion,&major); version_getMinor(msgVersion,&minor); - if (hdr->major==((unsigned char)major)) { /* Different major means incompatible */ + if(hdr->major==((unsigned char)major)){ /* Different major means incompatible */ check = (hdr->minor>=((unsigned char)minor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */ } } @@ -611,24 +612,7 @@ static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr) { return check; } -static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* out) { - pubsub_msg_serializer_map_t* map = handle; - hash_map_iterator_t iter = hashMapIterator_construct(map->serializers); - unsigned int msgTypeId = 0; - while (hashMapIterator_hasNext(&iter)) { - pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter); - if (strncmp(msgSer->msgName, msgType, 1024 * 1024) == 0) { - msgTypeId = msgSer->msgId; - break; - } - } - - if (msgTypeId == 0) { - printf("Cannot find msg type id for msgType %s\n", msgType); - return -1; - } else { - *out = msgTypeId; - return 0; - } +static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId){ + *msgTypeId = utils_stringHash(msgType); + return 0; } - http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/CMakeLists.txt b/pubsub/pubsub_admin_zmq/CMakeLists.txt index 49eba87..8c3c727 100644 --- a/pubsub/pubsub_admin_zmq/CMakeLists.txt +++ b/pubsub/pubsub_admin_zmq/CMakeLists.txt @@ -53,10 +53,11 @@ if (BUILD_PUBSUB_PSA_ZMQ) ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c + ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_admin_match.c ) set_target_properties(org.apache.celix.pubsub_admin.PubSubAdminZmq PROPERTIES INSTALL_RPATH "$ORIGIN") - target_link_libraries(org.apache.celix.pubsub_admin.PubSubAdminZmq celix_framework celix_utils celix_dfi ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} ${JANSSON_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY}) + target_link_libraries(org.apache.celix.pubsub_admin.PubSubAdminZmq celix_framework celix_utils celix_dfi ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY}) install_bundle(org.apache.celix.pubsub_admin.PubSubAdminZmq) endif() http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h index 7e7ac42..3a39a93 100644 --- a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h +++ b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h @@ -24,8 +24,8 @@ * \copyright Apache License, Version 2.0 */ -#ifndef PUBSUB_ADMIN_IMPL_H_ -#define PUBSUB_ADMIN_IMPL_H_ +#ifndef PUBSUB_ADMIN_ZMQ_IMPL_H_ +#define PUBSUB_ADMIN_ZMQ_IMPL_H_ #include /* The following undefs prevent the collision between: @@ -38,7 +38,7 @@ #undef LOG_WARNING #include "pubsub_admin.h" -#include "pubsub_serializer.h" +#include "pubsub_admin_match.h" #include "log_helper.h" #define PSA_ZMQ_BASE_PORT "PSA_ZMQ_BASE_PORT" @@ -47,13 +47,17 @@ #define PSA_ZMQ_DEFAULT_BASE_PORT 5501 #define PSA_ZMQ_DEFAULT_MAX_PORT 6000 -struct pubsub_admin { +#define PUBSUB_ADMIN_TYPE "zmq" - pubsub_serializer_service_t* serializerSvc; +struct pubsub_admin { bundle_context_pt bundle_context; log_helper_pt loghelper; + /* List of the available serializers */ + celix_thread_mutex_t serializerListLock; // List + array_list_pt serializerList; + celix_thread_mutex_t localPublicationsLock; hash_map_pt localPublications;// @@ -64,9 +68,17 @@ struct pubsub_admin { hash_map_pt subscriptions; // celix_thread_mutex_t pendingSubscriptionsLock; - celix_thread_mutexattr_t pendingSubscriptionsAttr; hash_map_pt pendingSubscriptions; //> + /* Those are used to keep track of valid subscriptions/publications that still have no valid serializer */ + celix_thread_mutex_t noSerializerPendingsLock; + array_list_pt noSerializerSubscriptions; // List + array_list_pt noSerializerPublications; // List + + celix_thread_mutex_t usedSerializersLock; + hash_map_pt topicSubscriptionsPerSerializer; // > + hash_map_pt topicPublicationsPerSerializer; // > + char* ipAddress; zactor_t* zmq_auth; @@ -75,11 +87,6 @@ struct pubsub_admin { unsigned int maxPort; }; -/* Note: correct locking order is - * 1. subscriptionsLock - * 2. publications locks - */ - celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin); celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin); @@ -92,10 +99,9 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoi celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char* scope, char* topic); celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope,char* topic); -celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score); -celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score); +celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt reference, void * service); +celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service); -celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc); -celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, pubsub_serializer_service_t* serializerSvc); +celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score); -#endif /* PUBSUB_ADMIN_IMPL_H_ */ +#endif /* PUBSUB_ADMIN_ZMQ_IMPL_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h b/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h deleted file mode 100644 index dbd2ff1..0000000 --- a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h +++ /dev/null @@ -1,51 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * pubsub_publish_service_private.h - * - * \date Sep 24, 2015 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#ifndef PUBSUB_PUBLISH_SERVICE_PRIVATE_H_ -#define PUBSUB_PUBLISH_SERVICE_PRIVATE_H_ - -#include "publisher.h" -#include "pubsub_endpoint.h" -#include "pubsub_common.h" -#include "pubsub_serializer.h" - -typedef struct topic_publication *topic_publication_pt; - -celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out); -celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub); - -celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep); -celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep); - -celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc); -celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub, pubsub_serializer_service_t* serializerSvc); - -celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory); -celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub); - -array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub); - -#endif /* PUBSUB_PUBLISH_SERVICE_PRIVATE_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/private/include/topic_publication.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/include/topic_publication.h b/pubsub/pubsub_admin_zmq/private/include/topic_publication.h new file mode 100644 index 0000000..3457263 --- /dev/null +++ b/pubsub/pubsub_admin_zmq/private/include/topic_publication.h @@ -0,0 +1,49 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * topic_publication.h + * + * \date Sep 24, 2015 + * \author Apache Celix Project Team + * \copyright Apache License, Version 2.0 + */ + +#ifndef TOPIC_PUBLICATION_H_ +#define TOPIC_PUBLICATION_H_ + +#include "publisher.h" +#include "pubsub_endpoint.h" +#include "pubsub_common.h" + +#include "pubsub_serializer.h" + +typedef struct topic_publication *topic_publication_pt; + +celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out); +celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub); + +celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep); +celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep); + +celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory); +celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub); + +array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub); + +#endif /* TOPIC_PUBLICATION_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h index c1e78c3..7267103 100644 --- a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h +++ b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h @@ -38,22 +38,21 @@ typedef struct topic_subscription* topic_subscription_pt; -celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_t* serializer, char* scope, char* topic,topic_subscription_pt* out); +celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,char* scope, char* topic, pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out); celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts); celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts); celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts); celix_status_t pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL); celix_status_t pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt ts, char* pubURL); + celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL); celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL); celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP); celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP); -celix_status_t pubsub_topicSubscriptionSetSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc); -celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts, pubsub_serializer_service_t* serializerSvc); - +array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt sub); celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription); celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt subscription); unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt subscription); http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/private/src/psa_activator.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_admin_zmq/private/src/psa_activator.c b/pubsub/pubsub_admin_zmq/private/src/psa_activator.c index cfe2c2e..fd07310 100644 --- a/pubsub/pubsub_admin_zmq/private/src/psa_activator.c +++ b/pubsub/pubsub_admin_zmq/private/src/psa_activator.c @@ -28,6 +28,7 @@ #include "bundle_activator.h" #include "service_registration.h" +#include "service_tracker.h" #include "pubsub_admin_impl.h" @@ -36,6 +37,7 @@ struct activator { pubsub_admin_pt admin; pubsub_admin_service_pt adminService; service_registration_pt registration; + service_tracker_pt serializerTracker; }; celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) { @@ -48,7 +50,28 @@ celix_status_t bundleActivator_create(bundle_context_pt context, void **userData } else{ *userData = activator; + status = pubsubAdmin_create(context, &(activator->admin)); + + if(status == CELIX_SUCCESS){ + service_tracker_customizer_pt customizer = NULL; + status = serviceTrackerCustomizer_create(activator->admin, + NULL, + pubsubAdmin_serializerAdded, + NULL, + pubsubAdmin_serializerRemoved, + &customizer); + if(status == CELIX_SUCCESS){ + status = serviceTracker_create(context, PUBSUB_SERIALIZER_SERVICE, customizer, &(activator->serializerTracker)); + if(status != CELIX_SUCCESS){ + serviceTrackerCustomizer_destroy(customizer); + pubsubAdmin_destroy(activator->admin); + } + } + else{ + pubsubAdmin_destroy(activator->admin); + } + } } return status; @@ -74,16 +97,14 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) pubsubAdminSvc->closeAllPublications = pubsubAdmin_closeAllPublications; pubsubAdminSvc->closeAllSubscriptions = pubsubAdmin_closeAllSubscriptions; - pubsubAdminSvc->matchPublisher = pubsubAdmin_matchPublisher; - pubsubAdminSvc->matchSubscriber = pubsubAdmin_matchSubscriber; - - pubsubAdminSvc->setSerializer = pubsubAdmin_setSerializer; - pubsubAdminSvc->removeSerializer = pubsubAdmin_removeSerializer; + pubsubAdminSvc->matchEndpoint = pubsubAdmin_matchEndpoint; activator->adminService = pubsubAdminSvc; status = bundleContext_registerService(context, PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration); + status += serviceTracker_open(activator->serializerTracker); + } @@ -94,7 +115,9 @@ celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) celix_status_t status = CELIX_SUCCESS; struct activator *activator = userData; - serviceRegistration_unregister(activator->registration); + status += serviceTracker_close(activator->serializerTracker); + status += serviceRegistration_unregister(activator->registration); + activator->registration = NULL; free(activator->adminService); @@ -107,6 +130,7 @@ celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt contex celix_status_t status = CELIX_SUCCESS; struct activator *activator = userData; + serviceTracker_destroy(activator->serializerTracker); pubsubAdmin_destroy(activator->admin); activator->admin = NULL;