Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 073CF200C22 for ; Mon, 6 Feb 2017 19:34:19 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 05F1D160B64; Mon, 6 Feb 2017 18:34:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1D21B160B56 for ; Mon, 6 Feb 2017 19:34:15 +0100 (CET) Received: (qmail 59732 invoked by uid 500); 6 Feb 2017 18:34:15 -0000 Mailing-List: contact commits-help@celix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@celix.apache.org Delivered-To: mailing list commits@celix.apache.org Received: (qmail 59330 invoked by uid 99); 6 Feb 2017 18:34:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Feb 2017 18:34:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 31D60DFC15; Mon, 6 Feb 2017 18:34:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pnoltes@apache.org To: commits@celix.apache.org Date: Mon, 06 Feb 2017 18:34:30 -0000 Message-Id: <5a98d8f77c4047b3b019d759f298b98c@git.apache.org> In-Reply-To: <42273219a2ea4445aec85a8bb0e8bab8@git.apache.org> References: <42273219a2ea4445aec85a8bb0e8bab8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [18/19] celix git commit: CELIX-389: Refactors pubsub. archived-at: Mon, 06 Feb 2017 18:34:19 -0000 http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h b/celix-pubsub/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h deleted file mode 100644 index 834dada..0000000 --- a/celix-pubsub/pubsub/examples/pubsub/publisher/private/include/pubsub_publisher_private.h +++ /dev/null @@ -1,60 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * pubsub_publisher_private.h - * - * \date Sep 21, 2010 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#ifndef PUBSUB_PUBLISHER_PRIVATE_H_ -#define PUBSUB_PUBLISHER_PRIVATE_H_ - -#include -#include -#include "publisher.h" - -struct pubsub_sender { - array_list_pt trackers; - const char *ident; - hash_map_pt tid_map; //service -> tid - long bundleId; -}; - -typedef struct pubsub_sender * pubsub_sender_pt; - -typedef struct send_thread_struct{ - pubsub_publisher_pt service; - pubsub_sender_pt publisher; - const char *topic; -} *send_thread_struct_pt; - -pubsub_sender_pt publisher_create(array_list_pt trackers, const char* ident,long bundleId); - -void publisher_start(pubsub_sender_pt client); -void publisher_stop(pubsub_sender_pt client); - -void publisher_destroy(pubsub_sender_pt client); - -celix_status_t publisher_publishSvcAdded(void * handle, service_reference_pt reference, void * service); -celix_status_t publisher_publishSvcRemoved(void * handle, service_reference_pt reference, void * service); - - -#endif /* PUBSUB_PUBLISHER_PRIVATE_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c b/celix-pubsub/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c deleted file mode 100644 index e4a8ba8..0000000 --- a/celix-pubsub/pubsub/examples/pubsub/publisher/private/src/ps_pub_activator.c +++ /dev/null @@ -1,157 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * ps_pub_activator.c - * - * \date Sep 21, 2010 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#include -#include -#include - -#include "bundle_activator.h" -#include "service_tracker.h" -#include "constants.h" - -#include "pubsub_common.h" -#include "pubsub_utils.h" -#include "publisher.h" -#include "pubsub_publisher_private.h" - -#define PUB_TOPIC "poi1;poi2" - -struct publisherActivator { - pubsub_sender_pt client; - array_list_pt trackerList;//List -}; - -celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) { - struct publisherActivator * act = malloc(sizeof(*act)); - - const char* fwUUID = NULL; - - bundleContext_getProperty(context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); - if(fwUUID==NULL){ - printf("PUBLISHER: Cannot retrieve fwUUID.\n"); - return CELIX_INVALID_BUNDLE_CONTEXT; - } - - bundle_pt bundle = NULL; - long bundleId = 0; - bundleContext_getBundle(context,&bundle); - bundle_getBundleId(bundle,&bundleId); - - arrayList_create(&(act->trackerList)); - act->client = publisher_create(act->trackerList,fwUUID,bundleId); - *userData = act; - - return CELIX_SUCCESS; -} - -celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) { - - struct publisherActivator * act = (struct publisherActivator *) userData; - - int i; - array_list_pt topic_list = pubsub_getTopicsFromString(PUB_TOPIC); - - if(topic_list !=NULL){ - - char filter[128]; - for(i=0; iclient,NULL,publisher_publishSvcAdded,NULL,publisher_publishSvcRemoved,&customizer); - serviceTracker_createWithFilter(context, filter, customizer, &tracker); - - arrayList_add(act->trackerList,tracker); - } - else{ - printf("Topic %s too long. Skipping publication.\n",topic); - } - free(topic); - } - arrayList_destroy(topic_list); - - } - - publisher_start(act->client); - - for(i=0;itrackerList);i++){ - service_tracker_pt tracker = arrayList_get(act->trackerList,i); - serviceTracker_open(tracker); - } - - return CELIX_SUCCESS; -} - -celix_status_t bundleActivator_stop(void * userData, bundle_context_pt __attribute__((unused)) context) { - struct publisherActivator * act = (struct publisherActivator *) userData; - int i; - - for(i=0;itrackerList);i++){ - service_tracker_pt tracker = arrayList_get(act->trackerList,i); - serviceTracker_close(tracker); - } - publisher_stop(act->client); - - return CELIX_SUCCESS; -} - -celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt __attribute__((unused)) context) { - struct publisherActivator * act = (struct publisherActivator *) userData; - int i; - - for(i=0;itrackerList);i++){ - service_tracker_pt tracker = arrayList_get(act->trackerList,i); - serviceTracker_destroy(tracker); - } - - publisher_destroy(act->client); - arrayList_destroy(act->trackerList); - - free(act); - printf("PUBLISHER: bundleActivator_destroy\n"); - return CELIX_SUCCESS; -} http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c b/celix-pubsub/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c deleted file mode 100644 index 66454a0..0000000 --- a/celix-pubsub/pubsub/examples/pubsub/publisher/private/src/pubsub_publisher.c +++ /dev/null @@ -1,172 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * pubsub_publisher.c - * - * \date Sep 21, 2010 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#include -#include -#include -#include - -#include "service_tracker.h" -#include "celix_threads.h" - -#include "pubsub_common.h" -#include "poi.h" - -#include "pubsub_publisher_private.h" - -static bool stop=false; - -static double randCoordinate(double min, double max){ - - double ret = min + (((double)rand()) / (((double)RAND_MAX)/(max-min))) ; - - return ret; - -} - -static void* send_thread(void* arg){ - - send_thread_struct_pt st_struct = (send_thread_struct_pt)arg; - - pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)st_struct->service; - pubsub_sender_pt publisher = (pubsub_sender_pt)st_struct->publisher; - - char fwUUID[9]; - memset(fwUUID,0,9); - memcpy(fwUUID,publisher->ident,8); - - //poi_t point = calloc(1,sizeof(*point)); - location_t place = calloc(1,sizeof(*place)); - - char* desc = calloc(64,sizeof(char)); - snprintf(desc,64,"fw-%s [TID=%lu]", fwUUID, (unsigned long)pthread_self()); - - char* name = calloc(64,sizeof(char)); - snprintf(name,64,"Bundle#%ld",publisher->bundleId); - - place->name = name; - place->description = desc; - place->extra = "DONT PANIC"; - printf("TOPIC : %s\n",st_struct->topic); - unsigned int msgId = 0; - if( publish_svc->localMsgTypeIdForMsgType(publish_svc->handle,st_struct->topic,&msgId) == 0 ){ - - while(stop==false){ - place->position.lat = randCoordinate(MIN_LAT,MAX_LAT); - place->position.lon = randCoordinate(MIN_LON,MAX_LON); - int nr_char = (int)randCoordinate(5,100000); - //int nr_char = 25; - place->data = calloc(nr_char, 1); - for(int i = 0; i < (nr_char-1); i++) { - place->data[i] = i%10 + '0'; - } - if(publish_svc->send) { - publish_svc->send(publish_svc->handle,msgId,place); - } else { - printf("No send for %s\n", st_struct->topic); - } - printf("Sent %s [%f, %f] (%s, %s) data len = %d\n",st_struct->topic, place->position.lat, place->position.lon,place->name,place->description, nr_char); - free(place->data); - sleep(2); - } - } - else{ - printf("PUBLISHER: Cannot retrieve msgId for message '%s'\n",MSG_POI_NAME); - } - - free(place->description); - free(place->name); - free(place); - - free(st_struct); - - - return NULL; - -} - -pubsub_sender_pt publisher_create(array_list_pt trackers,const char* ident,long bundleId) { - pubsub_sender_pt publisher = malloc(sizeof(*publisher)); - - publisher->trackers = trackers; - publisher->ident = ident; - publisher->bundleId = bundleId; - publisher->tid_map = hashMap_create(NULL, NULL, NULL, NULL); - - return publisher; -} - -void publisher_start(pubsub_sender_pt client) { - printf("PUBLISHER: starting up...\n"); -} - -void publisher_stop(pubsub_sender_pt client) { - printf("PUBLISHER: stopping...\n"); -} - -void publisher_destroy(pubsub_sender_pt client) { - hashMap_destroy(client->tid_map, false, false); - client->trackers = NULL; - client->ident = NULL; - free(client); -} - -celix_status_t publisher_publishSvcAdded(void * handle, service_reference_pt reference, void * service){ - pubsub_publisher_pt publish_svc = (pubsub_publisher_pt)service; - pubsub_sender_pt manager = (pubsub_sender_pt)handle; - - printf("PUBLISHER: new publish service exported (%s).\n",manager->ident); - - send_thread_struct_pt data = calloc(1,sizeof(struct send_thread_struct)); - const char *value = NULL; - serviceReference_getProperty(reference, PUBSUB_PUBLISHER_TOPIC, &value); - data->service = publish_svc; - data->publisher = manager; - data->topic = value; - celix_thread_t *tid = malloc(sizeof(*tid)); - stop=false; - celixThread_create(tid,NULL,send_thread,(void*)data); - hashMap_put(manager->tid_map, publish_svc, tid); - return CELIX_SUCCESS; -} - -celix_status_t publisher_publishSvcRemoved(void * handle, service_reference_pt reference, void * service){ - pubsub_sender_pt manager = (pubsub_sender_pt)handle; - celix_thread_t *tid = hashMap_get(manager->tid_map, service); - -#if defined(__APPLE__) && defined(__MACH__) - uint64_t threadid; - pthread_threadid_np(tid->thread, &threadid); - printf("PUBLISHER: publish service unexporting (%s) %llu!\n",manager->ident, threadid); -#else - printf("PUBLISHER: publish service unexporting (%s) %li!\n",manager->ident, tid->thread); -#endif - - stop=true; - celixThread_join(*tid,NULL); - free(tid); - return CELIX_SUCCESS; -} http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/examples/pubsub/publisher2/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/examples/pubsub/publisher2/CMakeLists.txt b/celix-pubsub/pubsub/examples/pubsub/publisher2/CMakeLists.txt deleted file mode 100644 index b83f7dd..0000000 --- a/celix-pubsub/pubsub/examples/pubsub/publisher2/CMakeLists.txt +++ /dev/null @@ -1,54 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -include_directories("../publisher/private/include") -include_directories("${PROJECT_SOURCE_DIR}/framework/public/include") -include_directories("${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include") -include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub") - -add_bundle(org.apache.celix.pubsub_publisher.PoiPublisher2 - SYMBOLIC_NAME "apache_celix_pubsub_poi_publisher2" - VERSION "1.0.0" - SOURCES - ../publisher/private/src/ps_pub_activator.c - ../publisher/private/src/pubsub_publisher.c - ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c -) - -bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher2 - ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor - ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor - DESTINATION "META-INF/descriptors" -) - -bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher2 - ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/poi1.properties - ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/poi2.properties - DESTINATION "META-INF/topics/pub" -) - -bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher2 - ${PROJECT_SOURCE_DIR}/pubsub/examples/keys/publisher - DESTINATION "META-INF/keys" -) - -bundle_files(org.apache.celix.pubsub_publisher.PoiPublisher2 - ${PROJECT_SOURCE_DIR}/pubsub/examples/keys/subscriber/public - DESTINATION "META-INF/keys/subscriber" -) - -target_link_libraries(org.apache.celix.pubsub_publisher.PoiPublisher2 celix_framework celix_utils) http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/examples/pubsub/subscriber/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/examples/pubsub/subscriber/CMakeLists.txt b/celix-pubsub/pubsub/examples/pubsub/subscriber/CMakeLists.txt deleted file mode 100644 index 7fd9fae..0000000 --- a/celix-pubsub/pubsub/examples/pubsub/subscriber/CMakeLists.txt +++ /dev/null @@ -1,55 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -include_directories("private/include") -include_directories("${PROJECT_SOURCE_DIR}/framework/public/include") -include_directories("${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include") -include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub") -include_directories("${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/common/include") - -add_bundle(org.apache.celix.pubsub_subscriber.PoiSubscriber - SYMBOLIC_NAME "apache_celix_pubsub_poi_subscriber" - VERSION "1.0.0" - SOURCES - private/src/ps_sub_activator.c - private/src/pubsub_subscriber.c - ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c -) - -bundle_files(org.apache.celix.pubsub_subscriber.PoiSubscriber - ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor - ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor - DESTINATION "META-INF/descriptors" -) - -bundle_files(org.apache.celix.pubsub_subscriber.PoiSubscriber - ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/poi1.properties - ${PROJECT_SOURCE_DIR}/pubsub/examples/pubsub/msg_descriptors/poi2.properties - DESTINATION "META-INF/topics/sub" -) - -bundle_files(org.apache.celix.pubsub_subscriber.PoiSubscriber - ${PROJECT_SOURCE_DIR}/pubsub/examples/keys/subscriber - DESTINATION "META-INF/keys" -) - -bundle_files(org.apache.celix.pubsub_subscriber.PoiSubscriber - ${PROJECT_SOURCE_DIR}/pubsub/examples/keys/publisher/public - DESTINATION "META-INF/keys/publisher" -) - -target_link_libraries(org.apache.celix.pubsub_subscriber.PoiSubscriber celix_framework celix_utils) http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h b/celix-pubsub/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h deleted file mode 100644 index c6072df..0000000 --- a/celix-pubsub/pubsub/examples/pubsub/subscriber/private/include/pubsub_subscriber_private.h +++ /dev/null @@ -1,52 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * pubsub_subscriber_private.h - * - * \date Sep 21, 2010 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#ifndef PUBSUB_SUBSCRIBER_PRIVATE_H_ -#define PUBSUB_SUBSCRIBER_PRIVATE_H_ - - -#include - -#include "celixbool.h" - -#include "pubsub_common.h" -#include "subscriber.h" - -struct pubsub_receiver { - char * name; -}; - -typedef struct pubsub_receiver* pubsub_receiver_pt; - -pubsub_receiver_pt subscriber_create(char* topics); -void subscriber_start(pubsub_receiver_pt client); -void subscriber_stop(pubsub_receiver_pt client); -void subscriber_destroy(pubsub_receiver_pt client); - -int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg,pubsub_multipart_callbacks_t *callbacks, bool* release); - - -#endif /* PUBSUB_SUBSCRIBER_PRIVATE_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/examples/pubsub/subscriber/private/src/ps_sub_activator.c ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/examples/pubsub/subscriber/private/src/ps_sub_activator.c b/celix-pubsub/pubsub/examples/pubsub/subscriber/private/src/ps_sub_activator.c deleted file mode 100644 index efd34c9..0000000 --- a/celix-pubsub/pubsub/examples/pubsub/subscriber/private/src/ps_sub_activator.c +++ /dev/null @@ -1,123 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * ps_sub_activator.c - * - * \date Sep 21, 2010 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#include - -#include "bundle_activator.h" - -#include "pubsub_common.h" -#include "pubsub_utils.h" -#include "pubsub_subscriber_private.h" - -#define SUB_TOPIC "poi1;poi2" - -struct subscriberActivator { - array_list_pt registrationList; //List - pubsub_subscriber_pt subsvc; -}; - -celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) { - struct subscriberActivator * act = calloc(1,sizeof(struct subscriberActivator)); - *userData = act; - arrayList_create(&(act->registrationList)); - return CELIX_SUCCESS; -} - -celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) { - struct subscriberActivator * act = (struct subscriberActivator *) userData; - - - pubsub_subscriber_pt subsvc = calloc(1,sizeof(*subsvc)); - pubsub_receiver_pt sub = subscriber_create(SUB_TOPIC); - subsvc->handle = sub; - subsvc->receive = pubsub_subscriber_recv; - - act->subsvc = subsvc; - - array_list_pt topic_list = pubsub_getTopicsFromString(SUB_TOPIC); - - if(topic_list !=NULL){ - - int i; - for(i=0; iregistrationList,reg); - } - else{ - printf("Topic %s too long. Skipping subscription.\n",topic); - } - free(topic); - } - arrayList_destroy(topic_list); - - } - - subscriber_start((pubsub_receiver_pt)act->subsvc->handle); - - return CELIX_SUCCESS; -} - -celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) { - struct subscriberActivator * act = (struct subscriberActivator *) userData; - - int i; - for(i=0; iregistrationList);i++){ - service_registration_pt reg = arrayList_get(act->registrationList,i); - serviceRegistration_unregister(reg); - - } - - subscriber_stop((pubsub_receiver_pt)act->subsvc->handle); - - return CELIX_SUCCESS; -} - -celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) { - - struct subscriberActivator * act = (struct subscriberActivator *) userData; - - act->subsvc->receive = NULL; - subscriber_destroy((pubsub_receiver_pt)act->subsvc->handle); - act->subsvc->handle = NULL; - free(act->subsvc); - act->subsvc = NULL; - - arrayList_destroy(act->registrationList); - free(act); - - return CELIX_SUCCESS; -} http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c b/celix-pubsub/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c deleted file mode 100644 index a137253..0000000 --- a/celix-pubsub/pubsub/examples/pubsub/subscriber/private/src/pubsub_subscriber.c +++ /dev/null @@ -1,64 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * pubsub_subscriber.c - * - * \date Sep 21, 2010 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#include -#include - -#include "poi.h" -#include "pubsub_subscriber_private.h" - -pubsub_receiver_pt subscriber_create(char* topics) { - pubsub_receiver_pt sub = calloc(1,sizeof(*sub)); - sub->name = strdup(topics); - return sub; -} - - -void subscriber_start(pubsub_receiver_pt subscriber){ - printf("Subscriber started...\n"); -} - -void subscriber_stop(pubsub_receiver_pt subscriber){ - printf("Subscriber stopped...\n"); -} - -void subscriber_destroy(pubsub_receiver_pt subscriber){ - if(subscriber->name!=NULL){ - free(subscriber->name); - } - subscriber->name=NULL; - free(subscriber); -} - -int pubsub_subscriber_recv(void* handle, const char* msgType, unsigned int msgTypeId, void* msg,pubsub_multipart_callbacks_t *callbacks, bool* release){ - - location_t place = (location_t)msg; - int nrchars = 25; - printf("Recv (%s): [%f, %f] (%s, %s) data_len = %ld data =%*.*s\n",msgType, place->position.lat, place->position.lon,place->name,place->description, strlen(place->data) + 1, nrchars, nrchars, place->data); - - return 0; - -} http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/keygen/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/keygen/CMakeLists.txt b/celix-pubsub/pubsub/keygen/CMakeLists.txt deleted file mode 100644 index bc42173..0000000 --- a/celix-pubsub/pubsub/keygen/CMakeLists.txt +++ /dev/null @@ -1,34 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -if (BUILD_ZMQ_SECURITY) - - find_package(ZMQ REQUIRED) - find_package(CZMQ REQUIRED) - find_package(OpenSSL 1.1.0 REQUIRED) - - include_directories("${ZMQ_INCLUDE_DIR}") - include_directories("${CZMQ_INCLUDE_DIR}") - include_directories("${OPENSSL_INCLUDE_DIR}") - - add_executable(makecert makecert.c) - target_link_libraries(makecert ${CZMQ_LIBRARIES}) - - add_executable(ed_file ed_file.c) - target_link_libraries(ed_file ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY}) - -endif() http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/keygen/ed_file.c ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/keygen/ed_file.c b/celix-pubsub/pubsub/keygen/ed_file.c deleted file mode 100644 index a0fc7e2..0000000 --- a/celix-pubsub/pubsub/keygen/ed_file.c +++ /dev/null @@ -1,309 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * ed_file.c - * - * \date Dec 2, 2016 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#include -#include - -#include -#include -#include - -#define MAX_KEY_FILE_LENGTH 256 -#define MAX_LINE_LENGTH 64 -#define AES_KEY_LENGTH 32 -#define AES_IV_LENGTH 16 - -#define KEY_TO_GET "aes_key" -#define IV_TO_GET "aes_iv" - -int generate_sha256_hash(char* text, unsigned char* digest); -int encrypt_aes(unsigned char *plaintext, int plaintext_len, unsigned char *key, unsigned char *iv, unsigned char *ciphertext); -int decrypt_aes(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext); - -static char* read_keys_file_content(const char *filePath); -static void parse_key_lines(char *keysBuffer, char **key, char **iv); -static void parse_key_line(char *line, char **key, char **iv); - -int main(int argc, const char* argv[]) -{ - if (argc < 4){ - printf("Usage: %s [options]\n", argv[0]); - printf("Default behavior: encrypting a file\n"); - printf("Options:\n"); - printf("\t-d\tSpecify to decrypt a file\n"); - printf("\n"); - return EXIT_FAILURE; - } - - int rc = 0; - - const char* keys_file_path = argv[1]; - const char* input_file_path = argv[2]; - const char* output_file_path = argv[3]; - - bool decryptParam = false; - if (argc > 4 && strcmp(argv[4], "-d") == 0){ - decryptParam = true; - } - - if (!zsys_file_exists(keys_file_path)){ - printf("Keys file '%s' doesn't exist!\n", keys_file_path); - return EXIT_FAILURE; - } - - if (!zsys_file_exists(input_file_path)){ - printf("Input file does not exist!\n"); - return EXIT_FAILURE; - } - - char* keys_data = read_keys_file_content(keys_file_path); - if (keys_data == NULL){ - return EXIT_FAILURE; - } - - char* key = NULL; - char* iv = NULL; - parse_key_lines(keys_data, &key, &iv); - free(keys_data); - - if (key == NULL || iv == NULL){ - printf("Loading AES key and/or AES iv failed!\n"); - free(key); - free(iv); - return EXIT_FAILURE; - } - - printf("Using AES Key \t\t'%s'\n", key); - printf("Using AES IV \t\t'%s'\n", iv); - printf("Input file path \t'%s'\n", input_file_path); - printf("Output file path \t'%s'\n", output_file_path); - printf("Decrypting \t\t'%s'\n\n", (decryptParam) ? "true" : "false"); - - unsigned char key_digest[EVP_MAX_MD_SIZE]; - unsigned char iv_digest[EVP_MAX_MD_SIZE]; - generate_sha256_hash((char*) key, key_digest); - generate_sha256_hash((char*) iv, iv_digest); - - zchunk_t* input_chunk = zchunk_slurp (input_file_path, 0); - if (input_chunk == NULL){ - printf("Input file not correct!\n"); - free(key); - free(iv); - return EXIT_FAILURE; - } - - //Load input data from file - int input_file_size = (int) zchunk_size (input_chunk); - char* input_file_data = zchunk_strdup(input_chunk); - zchunk_destroy (&input_chunk); - - int output_len; - unsigned char output[input_file_size]; - if (decryptParam){ - output_len = decrypt_aes((unsigned char*) input_file_data, input_file_size, key_digest, iv_digest, output); - output[output_len] = '\0'; - }else{ - output_len = encrypt_aes((unsigned char*) input_file_data, input_file_size, key_digest, iv_digest, output); - } - - //Write output data to file - zfile_t* output_file = zfile_new (".", output_file_path); - zchunk_t* output_chunk = zchunk_new(output, output_len); - rc = zfile_output (output_file); - if (rc != 0){ - printf("Problem with opening file for writing!\n"); - zchunk_destroy (&output_chunk); - zfile_close (output_file); - zfile_destroy (&output_file); - free(input_file_data); - free(key); - free(iv); - - return EXIT_FAILURE; - } - - rc = zfile_write (output_file, output_chunk, 0); - if (rc != 0){ - printf("Problem with writing output to file!\n"); - } - printf("Output written to file:\n"); - if (decryptParam){ - printf("%s\n", output); - }else{ - BIO_dump_fp (stdout, (const char *) output, output_len); - } - - zchunk_destroy (&output_chunk); - zfile_close (output_file); - zfile_destroy (&output_file); - free(input_file_data); - free(key); - free(iv); - - return EXIT_SUCCESS; -} - -int generate_sha256_hash(char* text, unsigned char* digest) -{ - unsigned int digest_len; - - EVP_MD_CTX * mdctx = EVP_MD_CTX_new(); - EVP_DigestInit_ex(mdctx, EVP_sha256(), NULL); - EVP_DigestUpdate(mdctx, text, strlen(text)); - EVP_DigestFinal_ex(mdctx, digest, &digest_len); - EVP_MD_CTX_free(mdctx); - - return digest_len; -} - -int encrypt_aes(unsigned char *plaintext, int plaintext_len, unsigned char *key, unsigned char *iv, unsigned char *ciphertext) -{ - int len; - int ciphertext_len; - - EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new(); - - EVP_EncryptInit_ex(ctx, EVP_aes_256_cbc(), NULL, key, iv); - EVP_EncryptUpdate(ctx, ciphertext, &len, plaintext, plaintext_len); - ciphertext_len = len; - EVP_EncryptFinal_ex(ctx, ciphertext + len, &len); - ciphertext_len += len; - - EVP_CIPHER_CTX_free(ctx); - - return ciphertext_len; -} - -int decrypt_aes(unsigned char *ciphertext, int ciphertext_len, unsigned char *key, unsigned char *iv, unsigned char *plaintext) -{ - int len; - int plaintext_len; - - EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new(); - - EVP_DecryptInit_ex(ctx, EVP_aes_256_cbc(), NULL, key, iv); - EVP_DecryptUpdate(ctx, plaintext, &len, ciphertext, ciphertext_len); - plaintext_len = len; - EVP_DecryptFinal_ex(ctx, plaintext + len, &len); - plaintext_len += len; - - EVP_CIPHER_CTX_free(ctx); - - return plaintext_len; -} - -static char* read_keys_file_content(const char *keys_file_path){ - char* keys_file_full_path = strndup(keys_file_path, MAX_KEY_FILE_LENGTH); - char* keys_file_name = NULL; - - char* sep_kf_at = strrchr(keys_file_path, '/'); - if (sep_kf_at != NULL){ - *sep_kf_at = '\0'; - keys_file_name = sep_kf_at + 1; - }else{ - keys_file_name = (char*) keys_file_path; - keys_file_path = (const char*) "."; - } - - printf("Keys file path: %s\n", keys_file_full_path); - - int rc = 0; - - zfile_t* keys_file = zfile_new (keys_file_path, keys_file_name); - rc = zfile_input (keys_file); - if (rc != 0){ - printf("Keys file '%s' not readable!\n", keys_file_full_path); - zfile_destroy(&keys_file); - free(keys_file_full_path); - return NULL; - } - - ssize_t keys_file_size = zsys_file_size (keys_file_full_path); - zchunk_t* keys_chunk = zfile_read (keys_file, keys_file_size, 0); - if (keys_chunk == NULL){ - printf("Can't read file '%s'!\n", keys_file_full_path); - zfile_close(keys_file); - zfile_destroy(&keys_file); - free(keys_file_full_path); - return NULL; - } - - char* keys_data = zchunk_strdup(keys_chunk); - zchunk_destroy(&keys_chunk); - zfile_close(keys_file); - zfile_destroy (&keys_file); - - return keys_data; -} - -static void parse_key_lines(char *keysBuffer, char **key, char **iv){ - char *line = NULL, *saveLinePointer = NULL; - - bool firstTime = true; - do { - if (firstTime){ - line = strtok_r(keysBuffer, "\n", &saveLinePointer); - firstTime = false; - }else { - line = strtok_r(NULL, "\n", &saveLinePointer); - } - - if (line == NULL){ - break; - } - - parse_key_line(line, key, iv); - - } while((*key == NULL || *iv == NULL) && line != NULL); - -} - -static void parse_key_line(char *line, char **key, char **iv){ - char *detectedKey = NULL, *detectedValue= NULL; - - char* sep_at = strchr(line, ':'); - if (sep_at == NULL){ - return; - } - - *sep_at = '\0'; // overwrite first separator, creating two strings. - detectedKey = line; - detectedValue = sep_at + 1; - - if (detectedKey == NULL || detectedValue == NULL){ - return; - } - if (detectedKey[0] == '\0' || detectedValue[0] == '\0'){ - return; - } - - if (*key == NULL && strcmp(detectedKey, KEY_TO_GET) == 0){ - *key = strndup(detectedValue, AES_KEY_LENGTH); - } else if (*iv == NULL && strcmp(detectedKey, IV_TO_GET) == 0){ - *iv = strndup(detectedValue, AES_IV_LENGTH); - } -} - http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/keygen/makecert.c ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/keygen/makecert.c b/celix-pubsub/pubsub/keygen/makecert.c deleted file mode 100644 index 166111e..0000000 --- a/celix-pubsub/pubsub/keygen/makecert.c +++ /dev/null @@ -1,55 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * makecert.c - * - * \date Dec 2, 2016 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#include - -#include "czmq.h" - -int main (int argc, const char * argv[]) -{ - - const char * cert_name_public = "certificate.pub"; - const char * cert_name_secret = "certificate.key"; - if (argc == 3 && strcmp(argv[1], argv[2]) != 0){ - cert_name_public = argv[1]; - cert_name_secret = argv[2]; - } - - zcert_t * cert = zcert_new(); - - char *timestr = zclock_timestr (); - zcert_set_meta (cert, "date-created", timestr); - free (timestr); - - zcert_save_public(cert, cert_name_public); - zcert_save_secret(cert, cert_name_secret); - zcert_print (cert); - printf("\n"); - printf("I: CURVE certificate created in %s and %s\n", cert_name_public, cert_name_secret); - zcert_destroy (&cert); - - return 0; -} http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_udp_mc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/CMakeLists.txt b/celix-pubsub/pubsub/pubsub_admin_udp_mc/CMakeLists.txt deleted file mode 100644 index dd25b19..0000000 --- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/CMakeLists.txt +++ /dev/null @@ -1,57 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -find_package(Jansson REQUIRED) - -include_directories("${PROJECT_SOURCE_DIR}/utils/public/include") -include_directories("${PROJECT_SOURCE_DIR}/log_service/public/include") -include_directories("${PROJECT_SOURCE_DIR}/dfi/public/include") -include_directories("${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/include") -include_directories("${PROJECT_SOURCE_DIR}/pubsub/api/pubsub") -include_directories("private/include") -include_directories("public/include") -include_directories("${JANSSON_INCLUDE_DIR}") -if (SERIALIZER_PATH) - include_directories("${SERIALIZER_PATH}/include") -endif() -if (SERIALIZER_LIB_INCLUDE_DIR) - include_directories("${SERIALIZER_LIB_INCLUDE_DIR}") -endif() -if (SERIALIZER_LIB_DIR) - link_directories("${SERIALIZER_LIB_DIR}") -endif() - -add_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc - BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_udp_multicast" - VERSION "1.0.0" - SOURCES - private/src/psa_activator.c - private/src/pubsub_admin_impl.c - private/src/topic_subscription.c - private/src/topic_publication.c - private/src/large_udp.c - ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/dyn_msg_utils.c - ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c - ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c - ${PUBSUB_SERIALIZER_SRC} -) - -set_target_properties(org.apache.celix.pubsub_admin.PubSubAdminUdpMc PROPERTIES INSTALL_RPATH "$ORIGIN") -target_link_libraries(org.apache.celix.pubsub_admin.PubSubAdminUdpMc celix_framework celix_utils celix_dfi ${JANSSON_LIBRARIES} ${SERIALIZER_LIBRARY}) - -install_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc) - http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_udp_mc/README.md ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/README.md b/celix-pubsub/pubsub/pubsub_admin_udp_mc/README.md deleted file mode 100644 index 19c7b86..0000000 --- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/README.md +++ /dev/null @@ -1,62 +0,0 @@ -#PUBSUB-Admin UDP Multicast - ---- - -##Description - -This description is particular for the UDP-Multicast PUB-SUB. - -The UDP multicast pubsub admin is used to transfer user data transparent via UDP multicast. UDP packets can contain approximately -64kB . To overcome this limit the admin has a protocol on top of UDP which fragments the data to be send and these -fragments are reassembled at the reception side. - -### IP Addresses - -To use UDP-multicast 2 IP adresses are needed: - -1. IP address which is bound to an (ethernet) interface -2. The multicast address (in the range 224.X.X.X - 239.X.X.X) - -When the PubSubAdmin starts it determines the bound IP address. This is done in the order: - -1. The first IP number bound to the interface which is set by the "PSA_INTERFACE" property -2. The interfaces are iterated and the first IP number found is used. (typically this is 127.0.0.1 (localhost) - -The Multicass IP address is determined in the order: - -1. If the `PSA_IP` property is defined, this IP will be used as multicast. -2. If the `PSA_MC_PREFIX` property, is defined, this property is used as the first 2 numbers of the multicast address extended with the last 2 numbers of the bound IP. -3. If the `PSA_MC_PREFIX` property is not defined `224.100` is used. - -### Discovery - -When a publisher request for a topic a TopicSender is created by a ServiceFactory. This TopicSender uses the multicast address as described above with a random chosen portnumber. The combination of the multicast-IP address with the portnumber and protocol(udp) is the endpoint. -This endpoint is published by the PubSubDiscovery within its topic in ETCD (i.e. udp://224.100.10.20:40123). - -A subscriber, interested in the topic, is informed by the the ToplogyManager that there is a new endpoint. The TopicReceiver at the subscriber side creates a listening socket based on this endpoint. - -Now a data-connection is created and data send by the publisher will be received by the subscriber. - ---- - -##Properties - - - - - - -
PropertyDescription
PSA_INTERFACEInterface which has to be used for multicast communication
PSA_IPMulticast IP address used by the bundle
PSA_MC_PREFIXFirst 2 digits of the MC IP address
- ---- - -##Shortcomings - -1. Per topic a random portnr is used for creating an endpoint. It is theoretical possible that for 2 topic the same endpoint is created. -2. For every message a 32 bit random message ID is generated to discriminate segments of different messages which could be sent at the same time. It is theoretically possible that there are 2 equal message ID's at the same time. But since the mesage ID is valid only during the transmission of a message (maximum some milliseconds with large messages) this is not very plausible. -3. When sending large messages, these messages are segmented and sent after each other. This could cause UDP-buffer overflows in the kernel. A solution could be to add a delay between sending of the segements but this will introduce extra latency. -4. A Hash is created, using the message definition, to identify the message type. When 2 messages generate the same hash something will terribly go wrong. A check should be added to prevent this (or another way to identify the message type). This problem is also valid for the other admins. - - - - http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/large_udp.h ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/large_udp.h b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/large_udp.h deleted file mode 100644 index a21e654..0000000 --- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/large_udp.h +++ /dev/null @@ -1,45 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * large_udp.h - * - * \date Mar 1, 2016 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#ifndef _LARGE_UDP_H_ -#define _LARGE_UDP_H_ -#include -#include -#include -#include -#include - -typedef struct largeUdp *largeUdp_pt; - -largeUdp_pt largeUdp_create(unsigned int maxNrUdpReceptions); -void largeUdp_destroy(largeUdp_pt handle); - -int largeUdp_sendto(largeUdp_pt handle, int fd, void *buf, size_t count, int flags, struct sockaddr_in *dest_addr, size_t addrlen); -int largeUdp_sendmsg(largeUdp_pt handle, int fd, struct iovec *largeMsg_iovec, int len, int flags, struct sockaddr_in *dest_addr, size_t addrlen); -bool largeUdp_dataAvailable(largeUdp_pt handle, int fd, unsigned int *index, unsigned int *size); -int largeUdp_read(largeUdp_pt handle, unsigned int index, void ** buffer, unsigned int size); - -#endif /* _LARGE_UDP_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h deleted file mode 100644 index 35fc164..0000000 --- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h +++ /dev/null @@ -1,73 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * pubsub_admin_impl.h - * - * \date Dec 5, 2013 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#ifndef PUBSUB_ADMIN_IMPL_H_ -#define PUBSUB_ADMIN_IMPL_H_ - -#include "pubsub_admin.h" -#include "log_helper.h" - -struct pubsub_admin { - - bundle_context_pt bundle_context; - log_helper_pt loghelper; - - celix_thread_mutex_t localPublicationsLock; - hash_map_pt localPublications;// - - celix_thread_mutex_t externalPublicationsLock; - hash_map_pt externalPublications;//> - - celix_thread_mutex_t subscriptionsLock; - hash_map_pt subscriptions; // - - celix_thread_mutex_t pendingSubscriptionsLock; - hash_map_pt pendingSubscriptions; //> - - char* ifIpAddress; // The local interface which is used for multicast communication - char* mcIpAddress; // The multicast IP address - - int sendSocket; - -}; - -celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin); -celix_status_t pubsubAdmin_stop(pubsub_admin_pt admin); -celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin); - -celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); -celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); - -celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP); -celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP); - -celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char* scope, char* topic); -celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope, char* topic); - -celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP, double* score); -celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, pubsub_endpoint_pt subEP, double* score); - -#endif /* PUBSUB_ADMIN_IMPL_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h deleted file mode 100644 index 57c7963..0000000 --- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h +++ /dev/null @@ -1,55 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * pubsub_publish_service_private.h - * - * \date Sep 24, 2015 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#ifndef PUBSUB_PUBLISH_SERVICE_PRIVATE_H_ -#define PUBSUB_PUBLISH_SERVICE_PRIVATE_H_ - -#include "publisher.h" -#include "pubsub_endpoint.h" -#include "pubsub_common.h" - -#define UDP_BASE_PORT 49152 -#define UDP_MAX_PORT 65000 - -typedef struct pubsub_udp_msg { - struct pubsub_msg_header header; - unsigned int payloadSize; - char payload[]; -} *pubsub_udp_msg_pt; - -typedef struct topic_publication *topic_publication_pt; -celix_status_t pubsub_topicPublicationCreate(int sendSocket, pubsub_endpoint_pt pubEP,char* bindIP, topic_publication_pt *out); -celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub); - -celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep); -celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep); - -celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory); -celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub); - -array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub); - -#endif /* PUBSUB_PUBLISH_SERVICE_PRIVATE_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h deleted file mode 100644 index 4ec705b..0000000 --- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h +++ /dev/null @@ -1,55 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * topic_subscription.h - * - * \date Sep 22, 2015 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#ifndef TOPIC_SUBSCRIPTION_H_ -#define TOPIC_SUBSCRIPTION_H_ - -#include "celix_threads.h" -#include "array_list.h" -#include "celixbool.h" -#include "service_tracker.h" - -#include "pubsub_endpoint.h" -#include "pubsub_common.h" - -typedef struct topic_subscription* topic_subscription_pt; - -celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt bundle_context, char* scope, char* topic,topic_subscription_pt* out); -celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts); -celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts); -celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts); - -celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt ts, char* pubURL); -celix_status_t pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* pubURL); - -celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP); -celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts, pubsub_endpoint_pt subEP); - -celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt subscription); -celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt subscription); -unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt subscription); - -#endif /*TOPIC_SUBSCRIPTION_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/large_udp.c ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/large_udp.c b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/large_udp.c deleted file mode 100644 index e5cd5b5..0000000 --- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/large_udp.c +++ /dev/null @@ -1,362 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * large_udp.c - * - * \date Mar 1, 2016 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#include "large_udp.h" - -#include -#include -#include -#include -#include -#include -#include - -#define MAX_UDP_MSG_SIZE 65535 /* 2^16 -1 */ -#define IP_HEADER_SIZE 20 -#define UDP_HEADER_SIZE 8 -//#define MTU_SIZE 1500 -#define MTU_SIZE 8000 -#define MAX_MSG_VECTOR_LEN 64 - -//#define NO_IP_FRAGMENTATION - -struct largeUdp { - unsigned int maxNrLists; - array_list_pt udpPartLists; - pthread_mutex_t dbLock; -}; - -typedef struct udpPartList { - unsigned int msg_ident; - unsigned int msg_size; - unsigned int nrPartsRemaining; - char *data; -} *udpPartList_pt; - - -typedef struct msg_part_header { - unsigned int msg_ident; - unsigned int total_msg_size; - unsigned int part_msg_size; - unsigned int offset; -} msg_part_header_t; - -#ifdef NO_IP_FRAGMENTATION - #define MAX_PART_SIZE (MTU_SIZE - (IP_HEADER_SIZE + UDP_HEADER_SIZE + sizeof(struct msg_part_header) )) -#else - #define MAX_PART_SIZE (MAX_UDP_MSG_SIZE - (IP_HEADER_SIZE + UDP_HEADER_SIZE + sizeof(struct msg_part_header) )) -#endif - -typedef struct msg_part { - msg_part_header_t header; - char data[MAX_PART_SIZE]; -} msg_part_t; - -// -// Create a handle -// -largeUdp_pt largeUdp_create(unsigned int maxNrUdpReceptions) -{ - printf("## Creating large UDP\n"); - largeUdp_pt handle = calloc(sizeof(*handle), 1); - if(handle != NULL) { - handle->maxNrLists = maxNrUdpReceptions; - if(arrayList_create(&handle->udpPartLists) != CELIX_SUCCESS) { - free(handle); - handle = NULL; - } - pthread_mutex_init(&handle->dbLock, 0); - } - - return handle; -} - -// -// Destroys the handle -// -void largeUdp_destroy(largeUdp_pt handle) -{ - printf("### Destroying large UDP\n"); - if(handle != NULL) { - pthread_mutex_lock(&handle->dbLock); - int nrUdpLists = arrayList_size(handle->udpPartLists); - int i; - for(i=0; i < nrUdpLists; i++) { - udpPartList_pt udpPartList = arrayList_remove(handle->udpPartLists, i); - if(udpPartList) { - if(udpPartList->data) { - free(udpPartList->data); - udpPartList->data = NULL; - } - free(udpPartList); - } - } - arrayList_destroy(handle->udpPartLists); - handle->udpPartLists = NULL; - pthread_mutex_unlock(&handle->dbLock); - pthread_mutex_destroy(&handle->dbLock); - free(handle); - } -} - -// -// Write large data to UDP. This function splits the data in chunks and sends these chunks with a header over UDP. -// -int largeUdp_sendmsg(largeUdp_pt handle, int fd, struct iovec *largeMsg_iovec, int len, int flags, struct sockaddr_in *dest_addr, size_t addrlen) -{ - int n; - int result = 0; - msg_part_header_t header; - - int written = 0; - header.msg_ident = rand(); - header.total_msg_size = 0; - for(n = 0; n < len ;n++) { - header.total_msg_size += largeMsg_iovec[n].iov_len; - } - int nr_buffers = (header.total_msg_size / MAX_PART_SIZE) + 1; - - struct iovec msg_iovec[MAX_MSG_VECTOR_LEN]; - struct msghdr msg; - msg.msg_name = dest_addr; - msg.msg_namelen = addrlen; - msg.msg_flags = 0; - msg.msg_iov = msg_iovec; - msg.msg_iovlen = 2; // header and payload; - msg.msg_control = NULL; - msg.msg_controllen = 0; - - msg.msg_iov[0].iov_base = &header; - msg.msg_iov[0].iov_len = sizeof(header); - - for(n = 0; n < nr_buffers; n++) { - - header.part_msg_size = (((header.total_msg_size - n * MAX_PART_SIZE) > MAX_PART_SIZE) ? MAX_PART_SIZE : (header.total_msg_size - n * MAX_PART_SIZE)); - header.offset = n * MAX_PART_SIZE; - int remainingOffset = header.offset; - int recvPart = 0; - // find the start of the part - while(remainingOffset > largeMsg_iovec[recvPart].iov_len) { - remainingOffset -= largeMsg_iovec[recvPart].iov_len; - recvPart++; - } - int remainingData = header.part_msg_size; - int sendPart = 1; - msg.msg_iovlen = 1; - - // fill in the output iovec from the input iovec in such a way that all UDP frames are filled maximal. - while(remainingData > 0) { - int partLen = ( (largeMsg_iovec[recvPart].iov_len - remainingOffset) <= remainingData ? (largeMsg_iovec[recvPart].iov_len -remainingOffset) : remainingData); - msg.msg_iov[sendPart].iov_base = largeMsg_iovec[recvPart].iov_base + remainingOffset; - msg.msg_iov[sendPart].iov_len = partLen; - remainingData -= partLen; - remainingOffset = 0; - sendPart++; - recvPart++; - msg.msg_iovlen++; - } - int tmp, tmptot; - for(tmp = 0, tmptot=0; tmp < msg.msg_iovlen; tmp++) { - tmptot += msg.msg_iov[tmp].iov_len; - } - - int w = sendmsg(fd, &msg, 0); - if(w == -1) { - perror("send()"); - result = -1; - break; - } - written += w; - } - - return (result == 0 ? written : result); -} - -// -// Write large data to UDP. This function splits the data in chunks and sends these chunks with a header over UDP. -// -int largeUdp_sendto(largeUdp_pt handle, int fd, void *buf, size_t count, int flags, struct sockaddr_in *dest_addr, size_t addrlen) -{ - int n; - int nr_buffers = (count / MAX_PART_SIZE) + 1; - int result = 0; - msg_part_header_t header; - - int written = 0; - header.msg_ident = rand(); - header.total_msg_size = count; - char *databuf = buf; - - struct iovec msg_iovec[2]; - struct msghdr msg; - msg.msg_name = dest_addr; - msg.msg_namelen = addrlen; - msg.msg_flags = 0; - msg.msg_iov = msg_iovec; - msg.msg_iovlen = 2; // header and payload; - msg.msg_control = NULL; - msg.msg_controllen = 0; - - msg.msg_iov[0].iov_base = &header; - msg.msg_iov[0].iov_len = sizeof(header); - - for(n = 0; n < nr_buffers; n++) { - - header.part_msg_size = (((header.total_msg_size - n * MAX_PART_SIZE) > MAX_PART_SIZE) ? MAX_PART_SIZE : (header.total_msg_size - n * MAX_PART_SIZE)); - header.offset = n * MAX_PART_SIZE; - msg.msg_iov[1].iov_base = &databuf[header.offset]; - msg.msg_iov[1].iov_len = header.part_msg_size; - int w = sendmsg(fd, &msg, 0); - if(w == -1) { - perror("send()"); - result = -1; - break; - } - written += w; - //usleep(1000); // TODO: If not slept a UDP buffer overflow occurs and parts are missing at the reception side (at least via localhost) - } - - return (result == 0 ? written : result); -} - -// -// Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure -// If the message is completely reassembled true is returned and the index and size have valid values -// -bool largeUdp_dataAvailable(largeUdp_pt handle, int fd, unsigned int *index, unsigned int *size) { - msg_part_header_t header; - int result = false; - // Only read the header, we don't know yet where to store the payload - if(recv(fd, &header, sizeof(header), MSG_PEEK) < 0) { - perror("read()"); - return false; - } - - struct iovec msg_vec[2]; - struct msghdr msg; - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_flags = 0; - msg.msg_iov = msg_vec; - msg.msg_iovlen = 2; // header and payload; - msg.msg_control = NULL; - msg.msg_controllen = 0; - - msg.msg_iov[0].iov_base = &header; - msg.msg_iov[0].iov_len = sizeof(header); - - pthread_mutex_lock(&handle->dbLock); - - int nrUdpLists = arrayList_size(handle->udpPartLists); - int i; - bool found = false; - for(i = 0; i < nrUdpLists; i++) { - udpPartList_pt udpPartList = arrayList_get(handle->udpPartLists, i); - if(udpPartList->msg_ident == header.msg_ident) { - found = true; - - //sanity check - if(udpPartList->msg_size != header.total_msg_size) { - // Corruption occurred. Remove the existing administration and build up a new one. - arrayList_remove(handle->udpPartLists, i); - free(udpPartList->data); - free(udpPartList); - found = false; - break; - } - - msg.msg_iov[1].iov_base = &udpPartList->data[header.offset]; - msg.msg_iov[1].iov_len = header.part_msg_size; - recvmsg(fd, &msg, 0); - - udpPartList->nrPartsRemaining--; - if(udpPartList->nrPartsRemaining == 0) { - *index = i; - *size = udpPartList->msg_size; - result = true; - break; - } else { - result = false; // not complete - break; - } - } - } - - if(found == false) { - udpPartList_pt udpPartList = NULL; - if(nrUdpLists == handle->maxNrLists) { - // remove list at index 0 - udpPartList = arrayList_remove(handle->udpPartLists, 0); - fprintf(stderr, "ERROR: Removing entry for id %d: %d parts not received\n",udpPartList->msg_ident, udpPartList->nrPartsRemaining ); - free(udpPartList->data); - free(udpPartList); - nrUdpLists--; - } - udpPartList = calloc(sizeof(*udpPartList), 1); - udpPartList->msg_ident = header.msg_ident; - udpPartList->msg_size = header.total_msg_size; - udpPartList->nrPartsRemaining = header.total_msg_size / MAX_PART_SIZE; - udpPartList->data = calloc(sizeof(char), header.total_msg_size); - - msg.msg_iov[1].iov_base = &udpPartList->data[header.offset]; - msg.msg_iov[1].iov_len = header.part_msg_size; - recvmsg(fd, &msg, 0); - - arrayList_add(handle->udpPartLists, udpPartList); - - if(udpPartList->nrPartsRemaining == 0) { - *index = nrUdpLists; - *size = udpPartList->msg_size; - result = true; - } else { - result = false; - } - - } - pthread_mutex_unlock(&handle->dbLock); - - return result; -} - -// -// Read out the message which is indicated available by the largeUdp_dataAvailable function -// -int largeUdp_read(largeUdp_pt handle, unsigned int index, void ** buffer, unsigned int size) -{ - int result = 0; - pthread_mutex_lock(&handle->dbLock); - - udpPartList_pt udpPartList = arrayList_remove(handle->udpPartLists, index); - if(udpPartList) { - *buffer = udpPartList->data; - free(udpPartList); - } else { - result = -1; - } - pthread_mutex_unlock(&handle->dbLock); - - return result; -} http://git-wip-us.apache.org/repos/asf/celix/blob/2d0923ea/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c ---------------------------------------------------------------------- diff --git a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c b/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c deleted file mode 100644 index 24202dd..0000000 --- a/celix-pubsub/pubsub/pubsub_admin_udp_mc/private/src/psa_activator.c +++ /dev/null @@ -1,116 +0,0 @@ -/** - *Licensed to the Apache Software Foundation (ASF) under one - *or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - *regarding copyright ownership. The ASF licenses this file - *to you under the Apache License, Version 2.0 (the - *"License"); you may not use this file except in compliance - *with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - *Unless required by applicable law or agreed to in writing, - *software distributed under the License is distributed on an - *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - *specific language governing permissions and limitations - *under the License. - */ -/* - * psa_activator.c - * - * \date Sep 30, 2011 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#include - -#include "bundle_activator.h" -#include "service_registration.h" - -#include "pubsub_admin_impl.h" - -struct activator { - pubsub_admin_pt admin; - pubsub_admin_service_pt adminService; - service_registration_pt registration; -}; - -celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator; - - activator = calloc(1, sizeof(*activator)); - if (!activator) { - status = CELIX_ENOMEM; - } - else{ - *userData = activator; - status = pubsubAdmin_create(context, &(activator->admin)); - } - - return status; -} - -celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator = userData; - pubsub_admin_service_pt pubsubAdminSvc = calloc(1, sizeof(*pubsubAdminSvc)); - - if (!pubsubAdminSvc) { - status = CELIX_ENOMEM; - } - else{ - pubsubAdminSvc->admin = activator->admin; - - pubsubAdminSvc->addPublication = pubsubAdmin_addPublication; - pubsubAdminSvc->removePublication = pubsubAdmin_removePublication; - - pubsubAdminSvc->addSubscription = pubsubAdmin_addSubscription; - pubsubAdminSvc->removeSubscription = pubsubAdmin_removeSubscription; - - pubsubAdminSvc->closeAllPublications = pubsubAdmin_closeAllPublications; - pubsubAdminSvc->closeAllSubscriptions = pubsubAdmin_closeAllSubscriptions; - - pubsubAdminSvc->matchPublisher = pubsubAdmin_matchPublisher; - pubsubAdminSvc->matchSubscriber = pubsubAdmin_matchSubscriber; - - activator->adminService = pubsubAdminSvc; - - status = bundleContext_registerService(context, PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration); - - } - - - return status; -} - -celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator = userData; - - serviceRegistration_unregister(activator->registration); - activator->registration = NULL; - - pubsubAdmin_stop(activator->admin); - - free(activator->adminService); - activator->adminService = NULL; - - return status; -} - -celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator = userData; - - pubsubAdmin_destroy(activator->admin); - activator->admin = NULL; - - free(activator); - - return status; -} - -