Return-Path: X-Original-To: apmail-celix-commits-archive@www.apache.org Delivered-To: apmail-celix-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5192B18638 for ; Fri, 13 Nov 2015 14:02:42 +0000 (UTC) Received: (qmail 2765 invoked by uid 500); 13 Nov 2015 14:02:42 -0000 Delivered-To: apmail-celix-commits-archive@celix.apache.org Received: (qmail 2735 invoked by uid 500); 13 Nov 2015 14:02:42 -0000 Mailing-List: contact commits-help@celix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@celix.apache.org Delivered-To: mailing list commits@celix.apache.org Received: (qmail 2689 invoked by uid 99); 13 Nov 2015 14:02:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Nov 2015 14:02:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0C3C2E03E8; Fri, 13 Nov 2015 14:02:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bpetri@apache.org To: commits@celix.apache.org Date: Fri, 13 Nov 2015 14:02:43 -0000 Message-Id: In-Reply-To: <732b2d50c2ad4878a0b7c14e09d83bd3@git.apache.org> References: <732b2d50c2ad4878a0b7c14e09d83bd3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] celix git commit: CELIX-293: removing of fw_log calls, pthread_*, minor bugfixing, added test CELIX-293: removing of fw_log calls, pthread_*, minor bugfixing, added test Project: http://git-wip-us.apache.org/repos/asf/celix/repo Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/c003b31d Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/c003b31d Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/c003b31d Branch: refs/heads/develop Commit: c003b31d86ad8e25e7bea21bf60cb60f9744c809 Parents: 3e0b7f8 Author: Bjoern Petri Authored: Fri Nov 13 15:01:56 2015 +0100 Committer: Bjoern Petri Committed: Fri Nov 13 15:01:56 2015 +0100 ---------------------------------------------------------------------- .../private/include/endpoint_discovery_server.h | 2 +- .../private/src/endpoint_discovery_poller.c | 2 - remote_services/discovery_shm/CMakeLists.txt | 4 +- .../private/include/discovery_impl.h | 2 +- .../private/include/discovery_shm.h | 56 ++++ .../private/include/discovery_shmWatcher.h | 40 +++ .../discovery_shm/private/include/shm.h | 56 ---- .../discovery_shm/private/include/shm_watcher.h | 40 --- .../discovery_shm/private/src/discovery_impl.c | 81 +++--- .../discovery_shm/private/src/discovery_shm.c | 279 +++++++++++++++++++ .../private/src/discovery_shmWatcher.c | 249 +++++++++++++++++ remote_services/discovery_shm/private/src/shm.c | 264 ------------------ .../discovery_shm/private/src/shm_watcher.c | 230 --------------- .../remote_service_admin_shm/CMakeLists.txt | 7 + .../src/remote_service_admin_activator.c | 114 ++++---- .../private/src/remote_service_admin_impl.c | 199 ++++++------- .../private/test/CMakeLists.txt | 58 ++++ .../private/test/client.properties.in | 25 ++ .../private/test/rsa_client_server_tests.cpp | 126 +++++++++ .../private/test/run_tests.cpp | 24 ++ .../private/test/server.properties.in | 25 ++ 21 files changed, 1087 insertions(+), 796 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/celix/blob/c003b31d/remote_services/discovery/private/include/endpoint_discovery_server.h ---------------------------------------------------------------------- diff --git a/remote_services/discovery/private/include/endpoint_discovery_server.h b/remote_services/discovery/private/include/endpoint_discovery_server.h index 6226b70..51082b5 100644 --- a/remote_services/discovery/private/include/endpoint_discovery_server.h +++ b/remote_services/discovery/private/include/endpoint_discovery_server.h @@ -69,7 +69,7 @@ celix_status_t endpointDiscoveryServer_addEndpoint(endpoint_discovery_server_pt celix_status_t endpointDiscoveryServer_removeEndpoint( endpoint_discovery_server_pt server, endpoint_description_pt endpoint); /** - * Removes the url, which is used by the discovery server to announce the endpoints + * Returns the url, which is used by the discovery server to announce the endpoints * * @param server [in] the endpoint discovery server to retrieve the url from * @param url [out] url which is used to announce the endpoints. http://git-wip-us.apache.org/repos/asf/celix/blob/c003b31d/remote_services/discovery/private/src/endpoint_discovery_poller.c ---------------------------------------------------------------------- diff --git a/remote_services/discovery/private/src/endpoint_discovery_poller.c b/remote_services/discovery/private/src/endpoint_discovery_poller.c index ec0a1b3..d2f347c 100644 --- a/remote_services/discovery/private/src/endpoint_discovery_poller.c +++ b/remote_services/discovery/private/src/endpoint_discovery_poller.c @@ -346,7 +346,6 @@ static size_t endpointDiscoveryPoller_writeMemory(void *contents, size_t size, s static celix_status_t endpointDiscoveryPoller_getEndpoints(endpoint_discovery_poller_pt poller, char *url, array_list_pt *updatedEndpoints) { celix_status_t status = CELIX_SUCCESS; - logHelper_log(*poller->loghelper, OSGI_LOGSERVICE_DEBUG, "Polling url '%s'", url); CURL *curl = NULL; CURLcode res = CURLE_OK; @@ -375,7 +374,6 @@ static celix_status_t endpointDiscoveryPoller_getEndpoints(endpoint_discovery_po status = endpointDescriptorReader_create(poller, &reader); if (status == CELIX_SUCCESS) { - logHelper_log(*poller->loghelper, OSGI_LOGSERVICE_DEBUG, "Polled data '%s'", chunk.memory); status = endpointDescriptorReader_parseDocument(reader, chunk.memory, updatedEndpoints); } http://git-wip-us.apache.org/repos/asf/celix/blob/c003b31d/remote_services/discovery_shm/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/remote_services/discovery_shm/CMakeLists.txt b/remote_services/discovery_shm/CMakeLists.txt index 2e7c14d..1e6ed00 100644 --- a/remote_services/discovery_shm/CMakeLists.txt +++ b/remote_services/discovery_shm/CMakeLists.txt @@ -37,8 +37,8 @@ find_package(CURL REQUIRED) SET_HEADERS("Bundle-Name: Apache Celix RSA Discovery SHM") bundle(discovery_shm SOURCES - private/src/shm - private/src/shm_watcher + private/src/discovery_shm + private/src/discovery_shmWatcher private/src/discovery_impl ${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/discovery_activator.c ${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/discovery.c http://git-wip-us.apache.org/repos/asf/celix/blob/c003b31d/remote_services/discovery_shm/private/include/discovery_impl.h ---------------------------------------------------------------------- diff --git a/remote_services/discovery_shm/private/include/discovery_impl.h b/remote_services/discovery_shm/private/include/discovery_impl.h index fa6d7bf..c7206bd 100644 --- a/remote_services/discovery_shm/private/include/discovery_impl.h +++ b/remote_services/discovery_shm/private/include/discovery_impl.h @@ -35,7 +35,7 @@ #include "endpoint_discovery_poller.h" #include "endpoint_discovery_server.h" -#include "shm_watcher.h" +#include "discovery_shmWatcher.h" #define DEFAULT_SERVER_IP "127.0.0.1" http://git-wip-us.apache.org/repos/asf/celix/blob/c003b31d/remote_services/discovery_shm/private/include/discovery_shm.h ---------------------------------------------------------------------- diff --git a/remote_services/discovery_shm/private/include/discovery_shm.h b/remote_services/discovery_shm/private/include/discovery_shm.h new file mode 100644 index 0000000..9c4593b --- /dev/null +++ b/remote_services/discovery_shm/private/include/discovery_shm.h @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * shm.h + * + * \date 26 Jul 2014 + * \author Apache Celix Project Team + * \copyright Apache License, Version 2.0 + */ + + + +#ifndef _DISCOVERY_SHM_H_ +#define _DISCOVERY_SHM_H_ + +#include + +#define SHM_ENTRY_MAX_KEY_LENGTH 256 +#define SHM_ENTRY_MAX_VALUE_LENGTH 256 + +// defines the time-to-live in seconds +#define SHM_ENTRY_DEFAULT_TTL 60 + +// we currently support 64 separate discovery instances +#define SHM_DATA_MAX_ENTRIES 64 + +typedef struct shmData* shmData_pt; + +/* creates a new shared memory block */ +celix_status_t discoveryShm_create(shmData_pt* data); +celix_status_t discoveryShm_attach(shmData_pt* data); +celix_status_t discoveryShm_set(shmData_pt data, char *key, char* value); +celix_status_t discoveryShm_get(shmData_pt data, char* key, char* value); +celix_status_t discoveryShm_getKeys(shmData_pt data, char** keys, int* size); +celix_status_t discoveryShm_remove(shmData_pt data, char* key); +celix_status_t discoveryShm_detach(shmData_pt data); +celix_status_t discoveryShm_destroy(shmData_pt data); + +#endif http://git-wip-us.apache.org/repos/asf/celix/blob/c003b31d/remote_services/discovery_shm/private/include/discovery_shmWatcher.h ---------------------------------------------------------------------- diff --git a/remote_services/discovery_shm/private/include/discovery_shmWatcher.h b/remote_services/discovery_shm/private/include/discovery_shmWatcher.h new file mode 100644 index 0000000..ff70f72 --- /dev/null +++ b/remote_services/discovery_shm/private/include/discovery_shmWatcher.h @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * shm_watcher.h + * + * \date 30 Sep 2014 + * \author Apache Celix Project Team + * \copyright Apache License, Version 2.0 + */ + +#ifndef DISCOVERY_SHM_WATCHER_H_ +#define DISCOVERY_SHM_WATCHER_H_ + +#include "celix_errno.h" +#include "discovery.h" +#include "endpoint_discovery_poller.h" + +typedef struct shm_watcher *shm_watcher_pt; + +celix_status_t discoveryShmWatcher_create(discovery_pt discovery); +celix_status_t discoveryShmWatcher_destroy(discovery_pt discovery); + + +#endif /* DISCOVERY_SHM_WATCHER_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/c003b31d/remote_services/discovery_shm/private/include/shm.h ---------------------------------------------------------------------- diff --git a/remote_services/discovery_shm/private/include/shm.h b/remote_services/discovery_shm/private/include/shm.h deleted file mode 100644 index f7b4160..0000000 --- a/remote_services/discovery_shm/private/include/shm.h +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * shm.h - * - * \date 26 Jul 2014 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - - - -#ifndef _SHM_H_ -#define _SHM_H_ - -#include - -#define SHM_ENTRY_MAX_KEY_LENGTH 256 -#define SHM_ENTRY_MAX_VALUE_LENGTH 256 - -// defines the time-to-live in seconds -#define SHM_ENTRY_DEFAULT_TTL 60 - -// we currently support 64 separate discovery instances -#define SHM_DATA_MAX_ENTRIES 64 - -typedef struct shmData* shmData_pt; - -/* creates a new shared memory block */ -celix_status_t discovery_shmCreate(shmData_pt* data); -celix_status_t discovery_shmAttach(shmData_pt* data); -celix_status_t discovery_shmSet(shmData_pt data, char *key, char* value); -celix_status_t discovery_shmGet(shmData_pt data, char* key, char* value); -celix_status_t discovery_shmGetKeys(shmData_pt data, char** keys, int* size); -celix_status_t discovery_shmRemove(shmData_pt data, char* key); -celix_status_t discovery_shmDetach(shmData_pt data); -celix_status_t discovery_shmDestroy(shmData_pt data); - -#endif http://git-wip-us.apache.org/repos/asf/celix/blob/c003b31d/remote_services/discovery_shm/private/include/shm_watcher.h ---------------------------------------------------------------------- diff --git a/remote_services/discovery_shm/private/include/shm_watcher.h b/remote_services/discovery_shm/private/include/shm_watcher.h deleted file mode 100644 index f99007f..0000000 --- a/remote_services/discovery_shm/private/include/shm_watcher.h +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * shm_watcher.h - * - * \date 30 Sep 2014 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#ifndef SHM_WATCHER_H_ -#define SHM_WATCHER_H_ - -#include "celix_errno.h" -#include "discovery.h" -#include "endpoint_discovery_poller.h" - -typedef struct shm_watcher *shm_watcher_pt; - -celix_status_t shmWatcher_create(endpoint_discovery_poller_pt poller, bundle_context_pt context, shm_watcher_pt *watcher); -celix_status_t shmWatcher_destroy(shm_watcher_pt watcher); - - -#endif /* SHM_WATCHER_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/c003b31d/remote_services/discovery_shm/private/src/discovery_impl.c ---------------------------------------------------------------------- diff --git a/remote_services/discovery_shm/private/src/discovery_impl.c b/remote_services/discovery_shm/private/src/discovery_impl.c index 1451c41..2befad3 100644 --- a/remote_services/discovery_shm/private/src/discovery_impl.c +++ b/remote_services/discovery_shm/private/src/discovery_impl.c @@ -44,7 +44,7 @@ #include "discovery.h" #include "discovery_impl.h" -#include "shm_watcher.h" +#include "discovery_shmWatcher.h" #include "endpoint_discovery_poller.h" #include "endpoint_discovery_server.h" @@ -64,8 +64,8 @@ celix_status_t discovery_create(bundle_context_pt context, discovery_pt *discove (*discovery)->listenerReferences = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL); (*discovery)->discoveredServices = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - status = celixThreadMutex_create(&(*discovery)->listenerReferencesMutex, NULL); - status = celixThreadMutex_create(&(*discovery)->discoveredServicesMutex, NULL); + celixThreadMutex_create(&(*discovery)->listenerReferencesMutex, NULL); + celixThreadMutex_create(&(*discovery)->discoveredServicesMutex, NULL); if (logHelper_create(context, &(*discovery)->loghelper) == CELIX_SUCCESS) { logHelper_start((*discovery)->loghelper); @@ -102,40 +102,24 @@ celix_status_t discovery_destroy(discovery_pt discovery) { celixThreadMutex_destroy(&discovery->listenerReferencesMutex); + + + free(discovery); return status; } celix_status_t discovery_start(discovery_pt discovery) { - celix_status_t status = CELIX_SUCCESS; - char *port = NULL; - char *path = NULL; - - - bundleContext_getProperty(discovery->context, DISCOVERY_SERVER_PORT, &port); - if (port == NULL) { - port = DEFAULT_SERVER_PORT; - } - - bundleContext_getProperty(discovery->context, DISCOVERY_SERVER_PATH, &path); - if (path == NULL) { - path = DEFAULT_SERVER_PATH; - } + celix_status_t status; status = endpointDiscoveryPoller_create(discovery, discovery->context, &discovery->poller); - if (status != CELIX_SUCCESS) { - return CELIX_BUNDLE_EXCEPTION; - } - - status = shmWatcher_create(discovery->poller, discovery->context, &discovery->watcher); - if (status != CELIX_SUCCESS) { - return CELIX_BUNDLE_EXCEPTION; + if (status == CELIX_SUCCESS) { + status = endpointDiscoveryServer_create(discovery, discovery->context, &discovery->server); } - status = endpointDiscoveryServer_create(discovery, discovery->context, &discovery->server); - if (status != CELIX_SUCCESS) { - return CELIX_BUNDLE_EXCEPTION; + if (status == CELIX_SUCCESS) { + status = discoveryShmWatcher_create(discovery); } return status; @@ -144,38 +128,35 @@ celix_status_t discovery_start(discovery_pt discovery) { celix_status_t discovery_stop(discovery_pt discovery) { celix_status_t status; - status = endpointDiscoveryServer_destroy(discovery->server); - if (status != CELIX_SUCCESS) { - return CELIX_BUNDLE_EXCEPTION; - } + status = discoveryShmWatcher_destroy(discovery); - status = shmWatcher_destroy(discovery->watcher); - if (status != CELIX_SUCCESS) { - return CELIX_BUNDLE_EXCEPTION; - } + if (status == CELIX_SUCCESS) { + status = endpointDiscoveryServer_destroy(discovery->server); + } - status = endpointDiscoveryPoller_destroy(discovery->poller); - if (status != CELIX_SUCCESS) { - return CELIX_BUNDLE_EXCEPTION; + if (status == CELIX_SUCCESS) { + status = endpointDiscoveryPoller_destroy(discovery->poller); } - hash_map_iterator_pt iter; + if (status == CELIX_SUCCESS) { + hash_map_iterator_pt iter; - celixThreadMutex_lock(&discovery->discoveredServicesMutex); + celixThreadMutex_lock(&discovery->discoveredServicesMutex); - iter = hashMapIterator_create(discovery->discoveredServices); - while (hashMapIterator_hasNext(iter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); - endpoint_description_pt endpoint = hashMapEntry_getValue(entry); + iter = hashMapIterator_create(discovery->discoveredServices); + while (hashMapIterator_hasNext(iter)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + endpoint_description_pt endpoint = hashMapEntry_getValue(entry); - discovery_informEndpointListeners(discovery, endpoint, false); - } - hashMapIterator_destroy(iter); + discovery_informEndpointListeners(discovery, endpoint, false); + } + hashMapIterator_destroy(iter); - celixThreadMutex_unlock(&discovery->discoveredServicesMutex); + celixThreadMutex_unlock(&discovery->discoveredServicesMutex); - logHelper_stop(discovery->loghelper); - logHelper_destroy(&discovery->loghelper); + logHelper_stop(discovery->loghelper); + logHelper_destroy(&discovery->loghelper); + } return status; } http://git-wip-us.apache.org/repos/asf/celix/blob/c003b31d/remote_services/discovery_shm/private/src/discovery_shm.c ---------------------------------------------------------------------- diff --git a/remote_services/discovery_shm/private/src/discovery_shm.c b/remote_services/discovery_shm/private/src/discovery_shm.c new file mode 100644 index 0000000..9567401 --- /dev/null +++ b/remote_services/discovery_shm/private/src/discovery_shm.c @@ -0,0 +1,279 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * discovery_shm.c + * + * \date 26 Jul 2014 + * \author Apache Celix Project Team + * \copyright Apache License, Version 2.0 + */ + + + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include + +#include "discovery_shm.h" + +#define DISCOVERY_SHM_MEMSIZE 262144 +#define DISCOVERY_SHM_FILENAME "/dev/null" +#define DISCOVERY_SHM_FTOK_ID 50 +#define DISCOVERY_SEM_FILENAME "/dev/null" +#define DISCOVERY_SEM_FTOK_ID 54 + +struct shmEntry { + char key[SHM_ENTRY_MAX_KEY_LENGTH]; + char value[SHM_ENTRY_MAX_VALUE_LENGTH]; + + time_t expires; +}; + +typedef struct shmEntry shmEntry; + +struct shmData { + shmEntry entries[SHM_DATA_MAX_ENTRIES]; + int numOfEntries; + int shmId; + + celix_thread_mutex_t globalLock; +}; + +void* shmAdress; + +static celix_status_t discoveryShm_removeWithIndex(shmData_pt data, int index); + +/* returns the ftok key to identify shared memory*/ +static key_t discoveryShm_getKey() { + return ftok(DISCOVERY_SHM_FILENAME, DISCOVERY_SHM_FTOK_ID); +} + +/* creates a new shared memory block */ +celix_status_t discoveryShm_create(shmData_pt* data) { + celix_status_t status; + + shmData_pt shmData = calloc(1, sizeof(*shmData)); + key_t shmKey = discoveryShm_getKey(); + + if (!shmData) { + status = CELIX_ENOMEM; + } else if ((shmData->shmId = shmget(shmKey, DISCOVERY_SHM_MEMSIZE, IPC_CREAT | 0666)) < 0) { + status = CELIX_BUNDLE_EXCEPTION; + } else if ((shmAdress = shmat(shmData->shmId, 0, 0)) == (char*) -1) { + status = CELIX_BUNDLE_EXCEPTION; + } else { + celix_thread_mutexattr_t threadAttr; + + shmData->numOfEntries = 0; + + status = celixThreadMutexAttr_create(&threadAttr); + +#ifdef LINUX + if (status == CELIX_SUCCESS) { + // This is Linux specific + status = pthread_mutexattr_setrobust(&threadAttr, PTHREAD_MUTEX_ROBUST); + } +#endif + + if (status == CELIX_SUCCESS) { + status = celixThreadMutex_create(&shmData->globalLock, &threadAttr); + } + + if (status == CELIX_SUCCESS) { + memcpy(shmAdress, shmData, sizeof(struct shmData)); + (*data) = shmAdress; + } + } + + free(shmData); + + return status; +} + +celix_status_t discoveryShm_attach(shmData_pt* data) { + celix_status_t status = CELIX_SUCCESS; + key_t shmKey = ftok(DISCOVERY_SHM_FILENAME, DISCOVERY_SHM_FTOK_ID); + int shmId = -1; + + if ((shmId = shmget(shmKey, DISCOVERY_SHM_MEMSIZE, 0666)) < 0) { + status = CELIX_BUNDLE_EXCEPTION; + } + + if (((*data) = shmat(shmId, 0, 0)) < 0) { + status = CELIX_BUNDLE_EXCEPTION; + } + + return status; +} + +static celix_status_t discoveryShm_getwithIndex(shmData_pt data, char* key, char* value, int* index) { + celix_status_t status = CELIX_BUNDLE_EXCEPTION; + time_t currentTime = time(NULL); + unsigned int i; + + for (i = 0; i < data->numOfEntries && status != CELIX_SUCCESS; i++) { + shmEntry entry = data->entries[i]; + // check if entry is still valid + if (data->entries[i].expires < currentTime) { + discoveryShm_removeWithIndex(data, i); + } else if (strcmp(entry.key, key) == 0) { + if (value) { + strcpy(value, entry.value); + } + if (index) { + (*index) = i; + } + status = CELIX_SUCCESS; + } + } + + return status; +} + +celix_status_t discoveryShm_getKeys(shmData_pt data, char** keys, int* size) { + celix_status_t status; + + status = celixThreadMutex_lock(&data->globalLock); + + if (status == CELIX_SUCCESS) { + unsigned int i = 0; + for (i = 0; i < data->numOfEntries; i++) { + shmEntry entry = data->entries[i]; + + if (entry.key) { + snprintf(keys[i], SHM_ENTRY_MAX_KEY_LENGTH, "%s", entry.key); + } + } + + (*size) = i; + + celixThreadMutex_unlock(&data->globalLock); + } + + return status; +} + +celix_status_t discoveryShm_set(shmData_pt data, char *key, char* value) { + celix_status_t status; + int index = -1; + + if (data->numOfEntries >= SHM_DATA_MAX_ENTRIES) { + status = CELIX_ILLEGAL_STATE; + } else { + status = celixThreadMutex_lock(&data->globalLock); + + if (status == CELIX_SUCCESS) { + // check if key already there + status = discoveryShm_getwithIndex(data, key, NULL, &index); + if (status != CELIX_SUCCESS) { + index = data->numOfEntries; + + snprintf(data->entries[index].key, SHM_ENTRY_MAX_KEY_LENGTH, "%s", key); + data->numOfEntries++; + + status = CELIX_SUCCESS; + } + + snprintf(data->entries[index].value, SHM_ENTRY_MAX_VALUE_LENGTH, "%s", value); + data->entries[index].expires = (time(NULL) + SHM_ENTRY_DEFAULT_TTL); + + celixThreadMutex_unlock(&data->globalLock); + } + } + + return status; +} + +celix_status_t discoveryShm_get(shmData_pt data, char* key, char* value) { + celix_status_t status; + + status = celixThreadMutex_lock(&data->globalLock); + + if (status == CELIX_SUCCESS) { + status = discoveryShm_getwithIndex(data, key, value, NULL); + + celixThreadMutex_unlock(&data->globalLock); + } + + return status; +} + +static celix_status_t discoveryShm_removeWithIndex(shmData_pt data, int index) { + celix_status_t status = CELIX_SUCCESS; + + data->numOfEntries--; + if (index < data->numOfEntries) { + memcpy((void*) &data->entries[index], (void*) &data->entries[index + 1], ((data->numOfEntries - index) * sizeof(struct shmEntry))); + } + + return status; +} + +celix_status_t discoveryShm_remove(shmData_pt data, char* key) { + celix_status_t status; + int index = -1; + + status = celixThreadMutex_lock(&data->globalLock); + + if (status == CELIX_SUCCESS) { + status = discoveryShm_getwithIndex(data, key, NULL, &index); + + if (status == CELIX_SUCCESS) { + status = discoveryShm_removeWithIndex(data, index); + } + + celixThreadMutex_unlock(&data->globalLock); + } + + return status; +} + +celix_status_t discoveryShm_detach(shmData_pt data) { + celix_status_t status = CELIX_BUNDLE_EXCEPTION; + + if (data->numOfEntries == 0) { + status = discoveryShm_destroy(data); + } + else if (shmdt(shmAdress) == 0) { + status = CELIX_SUCCESS; + } + + return status; +} + +celix_status_t discoveryShm_destroy(shmData_pt data) { + celix_status_t status = CELIX_BUNDLE_EXCEPTION; + + if (shmctl(data->shmId, IPC_RMID, 0) == 0) { + status = CELIX_SUCCESS; + } + + return status; + +} http://git-wip-us.apache.org/repos/asf/celix/blob/c003b31d/remote_services/discovery_shm/private/src/discovery_shmWatcher.c ---------------------------------------------------------------------- diff --git a/remote_services/discovery_shm/private/src/discovery_shmWatcher.c b/remote_services/discovery_shm/private/src/discovery_shmWatcher.c new file mode 100644 index 0000000..ea10bbb --- /dev/null +++ b/remote_services/discovery_shm/private/src/discovery_shmWatcher.c @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * discovery_shmWatcher.c + * + * \date 16 Sep 2014 + * \author Apache Celix Project Team + * \copyright Apache License, Version 2.0 + */ + +#include +#include +#include +#include +#include + + +#include "celix_log.h" +#include "constants.h" +#include "discovery_impl.h" + +#include "discovery_shm.h" +#include "discovery_shmWatcher.h" + +#include "endpoint_discovery_poller.h" + +struct shm_watcher { + shmData_pt shmData; + celix_thread_t watcherThread; + celix_thread_mutex_t watcherLock; + + volatile bool running; +}; + +// note that the rootNode shouldn't have a leading slash +static celix_status_t discoveryShmWatcher_getRootPath(char* rootNode) { + celix_status_t status = CELIX_SUCCESS; + + strcpy(rootNode, "discovery"); + + return status; +} + +static celix_status_t discoveryShmWatcher_getLocalNodePath(bundle_context_pt context, char* localNodePath) { + celix_status_t status; + char rootPath[MAX_ROOTNODE_LENGTH]; + char* uuid = NULL; + + status = discoveryShmWatcher_getRootPath(&rootPath[0]); + + if (status == CELIX_SUCCESS) { + status = bundleContext_getProperty(context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &uuid); + } + + if (status == CELIX_SUCCESS) { + if (rootPath[strlen(&rootPath[0]) - 1] == '/') { + snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s%s", &rootPath[0], uuid); + } else { + snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s/%s", &rootPath[0], uuid); + } + } + + return status; +} + +/* retrieves all endpoints from shm and syncs them with the ones already available */ +static celix_status_t discoveryShmWatcher_syncEndpoints(discovery_pt discovery) { + celix_status_t status = CELIX_SUCCESS; + shm_watcher_pt watcher = discovery->watcher; + char** shmKeyArr = calloc(SHM_DATA_MAX_ENTRIES, sizeof(*shmKeyArr)); + array_list_pt registeredKeyArr = NULL; + + int i, j, shmSize; + + for (i = 0; i < SHM_DATA_MAX_ENTRIES; i++) { + shmKeyArr[i] = calloc(SHM_ENTRY_MAX_KEY_LENGTH, sizeof(*shmKeyArr[i])); + } + + arrayList_create(®isteredKeyArr); + + // get all urls available in shm + discoveryShm_getKeys(watcher->shmData, shmKeyArr, &shmSize); + + // get all locally registered endpoints + endpointDiscoveryPoller_getDiscoveryEndpoints(discovery->poller, registeredKeyArr); + + // add discovery points which are in shm, but not local yet + for (i = 0; i < shmSize; i++) { + char url[SHM_ENTRY_MAX_VALUE_LENGTH]; + + if (discoveryShm_get(watcher->shmData, shmKeyArr[i], &url[0]) == CELIX_SUCCESS) { + bool elementFound = false; + + for (j = 0; j < arrayList_size(registeredKeyArr) && elementFound == false; j++) { + + if (strcmp(url, (char*) arrayList_get(registeredKeyArr, j)) == 0) { + free(arrayList_remove(registeredKeyArr, j)); + elementFound = true; + } + } + + if (elementFound == false) { + endpointDiscoveryPoller_addDiscoveryEndpoint(discovery->poller, url); + } + } + } + + // remove those which are not in shm + for (i = 0; i < arrayList_size(registeredKeyArr); i++) { + char* regUrl = arrayList_get(registeredKeyArr, i); + + if (regUrl != NULL) { + endpointDiscoveryPoller_removeDiscoveryEndpoint(discovery->poller, regUrl); + } + } + + for (i = 0; i < SHM_DATA_MAX_ENTRIES; i++) { + free(shmKeyArr[i]); + } + + free(shmKeyArr); + + for (j = 0; j < arrayList_size(registeredKeyArr); j++) { + free(arrayList_get(registeredKeyArr, j)); + } + + arrayList_destroy(registeredKeyArr); + + return status; +} + +static void* discoveryShmWatcher_run(void* data) { + discovery_pt discovery = (discovery_pt) data; + shm_watcher_pt watcher = discovery->watcher; + char localNodePath[MAX_LOCALNODE_LENGTH]; + char url[MAX_LOCALNODE_LENGTH]; + + if (discoveryShmWatcher_getLocalNodePath(discovery->context, &localNodePath[0]) != CELIX_SUCCESS) { + logHelper_log(discovery->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot retrieve local discovery path."); + } + + if (endpointDiscoveryServer_getUrl(discovery->server, &url[0]) != CELIX_SUCCESS) { + snprintf(url, MAX_LOCALNODE_LENGTH, "http://%s:%s/%s", DEFAULT_SERVER_IP, DEFAULT_SERVER_PORT, DEFAULT_SERVER_PATH); + } + + while (watcher->running) { + // register own framework + if (discoveryShm_set(watcher->shmData, localNodePath, url) != CELIX_SUCCESS) { + logHelper_log(discovery->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot set local discovery registration."); + } + + discoveryShmWatcher_syncEndpoints(discovery); + sleep(5); + } + + return NULL; +} + +celix_status_t discoveryShmWatcher_create(discovery_pt discovery) { + celix_status_t status = CELIX_SUCCESS; + shm_watcher_pt watcher = NULL; + + watcher = calloc(1, sizeof(*watcher)); + + if (!watcher) { + status = CELIX_ENOMEM; + } else { + status = discoveryShm_attach(&(watcher->shmData)); + + if (status != CELIX_SUCCESS) { + logHelper_log(discovery->loghelper, OSGI_LOGSERVICE_DEBUG, "Attaching to Shared Memory Failed. Trying to create."); + + status = discoveryShm_create(&(watcher->shmData)); + + if (status != CELIX_SUCCESS) { + logHelper_log(discovery->loghelper, OSGI_LOGSERVICE_ERROR, "Failed to create Shared Memory Segment."); + } + } + + if (status == CELIX_SUCCESS) { + discovery->watcher = watcher; + } + + } + + if (status == CELIX_SUCCESS) { + status = celixThreadMutex_create(&watcher->watcherLock, NULL); + } + + if (status == CELIX_SUCCESS) { + status = celixThreadMutex_lock(&watcher->watcherLock); + } + + if (status == CELIX_SUCCESS) { + status = celixThread_create(&watcher->watcherThread, NULL, discoveryShmWatcher_run, discovery); + watcher->running = true; + } + + if (status == CELIX_SUCCESS) { + status = celixThreadMutex_unlock(&watcher->watcherLock); + } + + return status; +} + +celix_status_t discoveryShmWatcher_destroy(discovery_pt discovery) { + celix_status_t status; + shm_watcher_pt watcher = discovery->watcher; + char localNodePath[MAX_LOCALNODE_LENGTH]; + + watcher->running = false; + + celixThread_join(watcher->watcherThread, NULL); + + // remove own framework + status = discoveryShmWatcher_getLocalNodePath(discovery->context, &localNodePath[0]); + + if (status == CELIX_SUCCESS) { + status = discoveryShm_remove(watcher->shmData, localNodePath); + } + + if (status == CELIX_SUCCESS) { + discoveryShm_detach(watcher->shmData); + free(watcher); + } + else { + logHelper_log(discovery->loghelper, OSGI_LOGSERVICE_WARNING, "Cannot remove local discovery registration."); + } + + + return status; +} + http://git-wip-us.apache.org/repos/asf/celix/blob/c003b31d/remote_services/discovery_shm/private/src/shm.c ---------------------------------------------------------------------- diff --git a/remote_services/discovery_shm/private/src/shm.c b/remote_services/discovery_shm/private/src/shm.c deleted file mode 100644 index aac0c48..0000000 --- a/remote_services/discovery_shm/private/src/shm.c +++ /dev/null @@ -1,264 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * shm.c - * - * \date 26 Jul 2014 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - - -#include -#include - -#include -#include -#include -#include - -#include -#include -#include - -#include "shm.h" - -// TODO: move somewhere else -#define DISCOVERY_SHM_MEMSIZE 262144 -#define DISCOVERY_SHM_FILENAME "/dev/null" -#define DISCOVERY_SHM_FTOK_ID 50 -#define DISCOVERY_SEM_FILENAME "/dev/null" -#define DISCOVERY_SEM_FTOK_ID 54 - -struct shmEntry { - char key[SHM_ENTRY_MAX_KEY_LENGTH]; - char value[SHM_ENTRY_MAX_VALUE_LENGTH]; - - time_t expires; -}; - -typedef struct shmEntry shmEntry; - -struct shmData { - shmEntry entries[SHM_DATA_MAX_ENTRIES]; - int numOfEntries; - int shmId; - - pthread_mutex_t globalLock; -}; - -void* shmAdress; - -static celix_status_t discovery_shmRemoveWithIndex(shmData_pt data, int index); - -/* returns the ftok key to identify shared memory*/ -static key_t getShmKey() { - return ftok(DISCOVERY_SHM_FILENAME, DISCOVERY_SHM_FTOK_ID); -} - -/* creates a new shared memory block */ -celix_status_t discovery_shmCreate(shmData_pt* data) { - celix_status_t status = CELIX_SUCCESS; - - shmData_pt shmData = calloc(1, sizeof(struct shmData)); - - key_t shmKey = getShmKey(); - - if (!shmData) { - status = CELIX_ENOMEM; - } else if ((shmData->shmId = shmget(shmKey, DISCOVERY_SHM_MEMSIZE, IPC_CREAT | 0666)) < 0) { - fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "Creation of shared memory segment failed."); - status = CELIX_BUNDLE_EXCEPTION; - } else if ((shmAdress = shmat(shmData->shmId, 0, 0)) == (char*) -1) { - fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "Attaching of shared memory segment failed."); - status = CELIX_BUNDLE_EXCEPTION; - } else { - shmData->numOfEntries = 0; - - pthread_mutexattr_t threadAttr; - if (pthread_mutexattr_init(&threadAttr) != 0) - printf("Error while initalizing lock attributes\n"); - else - pthread_mutexattr_setrobust(&threadAttr, PTHREAD_MUTEX_ROBUST); - - if (pthread_mutex_init(&shmData->globalLock, &threadAttr) == 0) - printf("Global lock sucessfully initialized\n"); - else { - printf("Global lock init failed\n"); - exit(1); - } - memcpy(shmAdress, shmData, sizeof(struct shmData)); - - (*data) = shmAdress; - } - free(shmData); - - return status; -} - -celix_status_t discovery_shmAttach(shmData_pt* data) { - celix_status_t status = CELIX_SUCCESS; - key_t shmKey = ftok(DISCOVERY_SHM_FILENAME, DISCOVERY_SHM_FTOK_ID); - int shmId = -1; - - if ((shmId = shmget(shmKey, DISCOVERY_SHM_MEMSIZE, 0666)) < 0) { - fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "DISCOVERY : Attaching of shared memory segment failed."); - status = CELIX_BUNDLE_EXCEPTION; - } - - if (((*data) = shmat(shmId, 0, 0)) < 0) { - fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "DISCOVERY : Attaching of shared memory segment failed."); - status = CELIX_BUNDLE_EXCEPTION; - } - - return status; -} - -static celix_status_t discovery_shmGetwithIndex(shmData_pt data, char* key, char* value, int* index) { - celix_status_t status = CELIX_BUNDLE_EXCEPTION; - time_t currentTime = time(NULL); - unsigned int i; - - for (i = 0; i < data->numOfEntries && status != CELIX_SUCCESS; i++) { - shmEntry entry = data->entries[i]; - // check if entry is still valid - if (data->entries[i].expires < currentTime) { - discovery_shmRemoveWithIndex(data, i); - } else if (strcmp(entry.key, key) == 0) { - if (value) { - strcpy(value, entry.value); - } - if (index) { - (*index) = i; - } - status = CELIX_SUCCESS; - } - } - - return status; -} - -celix_status_t discovery_shmGetKeys(shmData_pt data, char** keys, int* size) { - celix_status_t status = CELIX_SUCCESS; - unsigned int i = 0; - - pthread_mutex_lock(&data->globalLock); - - for (i = 0; i < data->numOfEntries; i++) { - shmEntry entry = data->entries[i]; - - if (entry.key) { - snprintf(keys[i], SHM_ENTRY_MAX_KEY_LENGTH, "%s", entry.key); - } - } - - (*size) = i; - - pthread_mutex_unlock(&data->globalLock); - return status; -} - -celix_status_t discovery_shmSet(shmData_pt data, char *key, char* value) { - celix_status_t status = CELIX_SUCCESS; - int index = -1; - - if (data->numOfEntries >= SHM_DATA_MAX_ENTRIES) { - status = CELIX_ILLEGAL_STATE; - } else { - pthread_mutex_lock(&data->globalLock); - - // check if key already there - if (discovery_shmGetwithIndex(data, key, NULL, &index) != CELIX_SUCCESS) { - index = data->numOfEntries; - - snprintf(data->entries[index].key, SHM_ENTRY_MAX_KEY_LENGTH, "%s", key); - data->numOfEntries++; - } - - snprintf(data->entries[index].value, SHM_ENTRY_MAX_VALUE_LENGTH, "%s", value); - data->entries[index].expires = (time(NULL) + SHM_ENTRY_DEFAULT_TTL); - - pthread_mutex_unlock(&data->globalLock); - } - - return status; -} - -celix_status_t discovery_shmGet(shmData_pt data, char* key, char* value) { - celix_status_t status = CELIX_ILLEGAL_ARGUMENT; - - pthread_mutex_lock(&data->globalLock); - - status = discovery_shmGetwithIndex(data, key, value, NULL); - - pthread_mutex_unlock(&data->globalLock); - - return status; -} - -static celix_status_t discovery_shmRemoveWithIndex(shmData_pt data, int index) { - celix_status_t status = CELIX_SUCCESS; - - data->numOfEntries--; - if (index < data->numOfEntries) { - memcpy((void*) &data->entries[index], (void*) &data->entries[index + 1], ((data->numOfEntries - index) * sizeof(struct shmEntry))); - } - - return status; -} - -celix_status_t discovery_shmRemove(shmData_pt data, char* key) { - celix_status_t status = CELIX_SUCCESS; - int index = -1; - - pthread_mutex_lock(&data->globalLock); - - if ((status = discovery_shmGetwithIndex(data, key, NULL, &index)) == CELIX_SUCCESS) { - - discovery_shmRemoveWithIndex(data, index); - } - pthread_mutex_unlock(&data->globalLock); - - return status; - -} - -celix_status_t discovery_shmDetach(shmData_pt data) { - celix_status_t status = CELIX_SUCCESS; - - if (data->numOfEntries == 0) - discovery_shmDestroy(data); - else { - fw_log(logger, OSGI_FRAMEWORK_LOG_DEBUG, "DISCOVERY_SHM: Detaching from Shared memory\n"); - shmdt(shmAdress); - } - - return status; -} - -celix_status_t discovery_shmDestroy(shmData_pt data) { - celix_status_t status = CELIX_SUCCESS; - - fw_log(logger, OSGI_FRAMEWORK_LOG_DEBUG, "DISCOVERY_SHM: Destroying Shared memory."); - shmctl(data->shmId, IPC_RMID, 0); - - return status; - -} http://git-wip-us.apache.org/repos/asf/celix/blob/c003b31d/remote_services/discovery_shm/private/src/shm_watcher.c ---------------------------------------------------------------------- diff --git a/remote_services/discovery_shm/private/src/shm_watcher.c b/remote_services/discovery_shm/private/src/shm_watcher.c deleted file mode 100644 index 2152345..0000000 --- a/remote_services/discovery_shm/private/src/shm_watcher.c +++ /dev/null @@ -1,230 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * shm_watcher.c - * - * \date 16 Sep 2014 - * \author Apache Celix Project Team - * \copyright Apache License, Version 2.0 - */ - -#include -#include -#include -#include - -#include "celix_log.h" -#include "constants.h" -#include "discovery_impl.h" - -#include "shm.h" -#include "shm_watcher.h" - -#include "endpoint_discovery_poller.h" - -struct shm_watcher { - endpoint_discovery_poller_pt poller; - bundle_context_pt context; - - shmData_pt shmData; - celix_thread_t watcherThread; - celix_thread_mutex_t watcherLock; - - volatile bool running; -}; - -// note that the rootNode shouldn't have a leading slash -static celix_status_t shmWatcher_getRootPath(char* rootNode) { - celix_status_t status = CELIX_SUCCESS; - - strcpy(rootNode, "discovery"); - - return status; -} - -static celix_status_t shmWatcher_getLocalNodePath(bundle_context_pt context, char* localNodePath) { - celix_status_t status = CELIX_SUCCESS; - char rootPath[MAX_ROOTNODE_LENGTH]; - char* uuid = NULL; - - if (shmWatcher_getRootPath(&rootPath[0]) != CELIX_SUCCESS) { - status = CELIX_ILLEGAL_STATE; - } else if (((bundleContext_getProperty(context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &uuid)) != CELIX_SUCCESS) || (!uuid)) { - status = CELIX_ILLEGAL_STATE; - } else if (rootPath[strlen(&rootPath[0]) - 1] == '/') { - snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s%s", &rootPath[0], uuid); - } else { - snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s/%s", &rootPath[0], uuid); - } - - return status; -} - -/* retrieves all endpoints from shm and syncs them with the ones already available */ -static celix_status_t shmWatcher_syncEndpoints(shm_watcher_pt watcher) { - celix_status_t status = CELIX_SUCCESS; - char** shmKeyArr = calloc(SHM_DATA_MAX_ENTRIES, sizeof(*shmKeyArr)); - array_list_pt registeredKeyArr = NULL; //calloc(SHM_DATA_MAX_ENTRIES, sizeof(*registeredKeyArr)); - - int i, j, shmSize; - - for (i = 0; i < SHM_DATA_MAX_ENTRIES; i++) { - shmKeyArr[i] = calloc(SHM_ENTRY_MAX_KEY_LENGTH, sizeof(*shmKeyArr[i])); - } - - arrayList_create(®isteredKeyArr); - - // get all urls available in shm - discovery_shmGetKeys(watcher->shmData, shmKeyArr, &shmSize); - - // get all locally registered endpoints - endpointDiscoveryPoller_getDiscoveryEndpoints(watcher->poller, registeredKeyArr); - - // add discovery points which are in shm, but not local yet - for (i = 0; i < shmSize; i++) { - char url[SHM_ENTRY_MAX_VALUE_LENGTH]; - bool elementFound = false; - - if (discovery_shmGet(watcher->shmData, shmKeyArr[i], &url[0]) == CELIX_SUCCESS) { - for (j = 0; j < arrayList_size(registeredKeyArr) && elementFound == false; j++) { - - if (strcmp(url, (char*) arrayList_get(registeredKeyArr, j)) == 0) { - free(arrayList_remove(registeredKeyArr, j)); - elementFound = true; - } - } - - if (elementFound == false) { - endpointDiscoveryPoller_addDiscoveryEndpoint(watcher->poller, url); - } - } - } - - // remove those which are not in shm - for (i = 0; i < arrayList_size(registeredKeyArr); i++) { - char* regUrl = arrayList_get(registeredKeyArr, i); - - if (regUrl != NULL) { - endpointDiscoveryPoller_removeDiscoveryEndpoint(watcher->poller, regUrl); - } - } - - for (i = 0; i < SHM_DATA_MAX_ENTRIES; i++) { - free(shmKeyArr[i]); - } - - free(shmKeyArr); - - for (j = 0; j < arrayList_size(registeredKeyArr); j++) { - free(arrayList_get(registeredKeyArr, j)); - } - - arrayList_destroy(registeredKeyArr); - - return status; -} - -static void* shmWatcher_run(void* data) { - shm_watcher_pt watcher = (shm_watcher_pt) data; - char localNodePath[MAX_LOCALNODE_LENGTH]; - char* endpoints = NULL; - - if (shmWatcher_getLocalNodePath(watcher->context, &localNodePath[0]) != CELIX_SUCCESS) { - fw_log(logger, OSGI_FRAMEWORK_LOG_WARNING, "Cannot register local discovery"); - } - - if ((bundleContext_getProperty(watcher->context, DISCOVERY_POLL_ENDPOINTS, &endpoints) != CELIX_SUCCESS) || !endpoints) { - endpoints = DEFAULT_POLL_ENDPOINTS; - } - - while (watcher->running) { - // register own framework - if (discovery_shmSet(watcher->shmData, localNodePath, endpoints) != CELIX_SUCCESS) { - fw_log(logger, OSGI_FRAMEWORK_LOG_WARNING, "Cannot register local discovery"); - } - - shmWatcher_syncEndpoints(watcher); - sleep(5); - } - - return NULL; -} - -celix_status_t shmWatcher_create(endpoint_discovery_poller_pt poller, bundle_context_pt context, shm_watcher_pt *watcher) { - celix_status_t status = CELIX_SUCCESS; - - if (poller == NULL) { - return CELIX_BUNDLE_EXCEPTION; - } - - (*watcher) = calloc(1, sizeof(struct shm_watcher)); - if (!*watcher) { - return CELIX_ENOMEM; - } else { - (*watcher)->poller = poller; - (*watcher)->context = context; - if (discovery_shmAttach(&((*watcher)->shmData)) != CELIX_SUCCESS) - discovery_shmCreate(&((*watcher)->shmData)); - - } - - if ((status = celixThreadMutex_create(&(*watcher)->watcherLock, NULL)) != CELIX_SUCCESS) { - return status; - } - - if ((status = celixThreadMutex_lock(&(*watcher)->watcherLock)) != CELIX_SUCCESS) { - return status; - } - - if ((status = celixThread_create(&(*watcher)->watcherThread, NULL, shmWatcher_run, *watcher)) != CELIX_SUCCESS) { - return status; - } - - (*watcher)->running = true; - - if ((status = celixThreadMutex_unlock(&(*watcher)->watcherLock)) != CELIX_SUCCESS) { - return status; - } - - return status; -} - -celix_status_t shmWatcher_destroy(shm_watcher_pt watcher) { - celix_status_t status = CELIX_SUCCESS; - char localNodePath[MAX_LOCALNODE_LENGTH]; - - watcher->running = false; - - celixThread_join(watcher->watcherThread, NULL); - - // register own framework - if ((status = shmWatcher_getLocalNodePath(watcher->context, &localNodePath[0])) != CELIX_SUCCESS) { - return status; - } - - if (discovery_shmRemove(watcher->shmData, localNodePath) != CELIX_SUCCESS) { - fw_log(logger, OSGI_FRAMEWORK_LOG_WARNING, "Cannot remove local discovery registration."); - } - - discovery_shmDetach(watcher->shmData); - free(watcher); - - return status; -} - http://git-wip-us.apache.org/repos/asf/celix/blob/c003b31d/remote_services/remote_service_admin_shm/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/remote_services/remote_service_admin_shm/CMakeLists.txt b/remote_services/remote_service_admin_shm/CMakeLists.txt index a11f893..ce2f13a 100644 --- a/remote_services/remote_service_admin_shm/CMakeLists.txt +++ b/remote_services/remote_service_admin_shm/CMakeLists.txt @@ -47,5 +47,12 @@ if (RSA_REMOTE_SERVICE_ADMIN_SHM) ) target_link_libraries(remote_service_admin_shm celix_framework) + + if (ENABLE_TESTING) + find_package(CppUTest REQUIRED) + include_directories(${CPPUTEST_INCLUDE_DIR}) + add_subdirectory(private/test) + endif() + endif (RSA_REMOTE_SERVICE_ADMIN_SHM) http://git-wip-us.apache.org/repos/asf/celix/blob/c003b31d/remote_services/remote_service_admin_shm/private/src/remote_service_admin_activator.c ---------------------------------------------------------------------- diff --git a/remote_services/remote_service_admin_shm/private/src/remote_service_admin_activator.c b/remote_services/remote_service_admin_shm/private/src/remote_service_admin_activator.c index 9d781e8..dd2fb4d 100644 --- a/remote_services/remote_service_admin_shm/private/src/remote_service_admin_activator.c +++ b/remote_services/remote_service_admin_shm/private/src/remote_service_admin_activator.c @@ -33,76 +33,77 @@ #include "import_registration_impl.h" struct activator { - remote_service_admin_pt admin; - remote_service_admin_service_pt adminService; - service_registration_pt registration; + remote_service_admin_pt admin; + remote_service_admin_service_pt adminService; + service_registration_pt registration; }; celix_status_t bundleActivator_create(bundle_context_pt context, void **userData) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator; + celix_status_t status = CELIX_SUCCESS; + struct activator *activator; - activator = calloc(1, sizeof(*activator)); - if (!activator) { - status = CELIX_ENOMEM; - } else { - activator->admin = NULL; - activator->registration = NULL; + activator = calloc(1, sizeof(*activator)); + if (!activator) { + status = CELIX_ENOMEM; + } else { + activator->admin = NULL; + activator->registration = NULL; - *userData = activator; - } + *userData = activator; + } - return status; + return status; } celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator = userData; - remote_service_admin_service_pt remoteServiceAdmin = NULL; - - status = remoteServiceAdmin_create(context, &activator->admin); - if (status == CELIX_SUCCESS) { - remoteServiceAdmin = calloc(1, sizeof(*remoteServiceAdmin)); - if (!remoteServiceAdmin) { - status = CELIX_ENOMEM; - } else { - remoteServiceAdmin->admin = activator->admin; - remoteServiceAdmin->exportService = remoteServiceAdmin_exportService; - remoteServiceAdmin->getExportedServices = remoteServiceAdmin_getExportedServices; - remoteServiceAdmin->getImportedEndpoints = remoteServiceAdmin_getImportedEndpoints; - remoteServiceAdmin->importService = remoteServiceAdmin_importService; - - remoteServiceAdmin->exportReference_getExportedEndpoint = exportReference_getExportedEndpoint; - remoteServiceAdmin->exportReference_getExportedService = exportReference_getExportedService; - - remoteServiceAdmin->exportRegistration_close = exportRegistration_close; - remoteServiceAdmin->exportRegistration_getException = exportRegistration_getException; - remoteServiceAdmin->exportRegistration_getExportReference = exportRegistration_getExportReference; - - remoteServiceAdmin->importReference_getImportedEndpoint = importReference_getImportedEndpoint; - remoteServiceAdmin->importReference_getImportedService = importReference_getImportedService; - - remoteServiceAdmin->importRegistration_close = remoteServiceAdmin_removeImportedService; - remoteServiceAdmin->importRegistration_getException = importRegistration_getException; - remoteServiceAdmin->importRegistration_getImportReference = importRegistration_getImportReference; - - status = bundleContext_registerService(context, OSGI_RSA_REMOTE_SERVICE_ADMIN, remoteServiceAdmin, NULL, &activator->registration); - - activator->adminService = remoteServiceAdmin; - } - } - - return status; + celix_status_t status; + struct activator *activator = userData; + remote_service_admin_service_pt remoteServiceAdmin = NULL; + + status = remoteServiceAdmin_create(context, &activator->admin); + if (status == CELIX_SUCCESS) { + remoteServiceAdmin = calloc(1, sizeof(*remoteServiceAdmin)); + if (!remoteServiceAdmin) { + status = CELIX_ENOMEM; + } else { + remoteServiceAdmin->admin = activator->admin; + remoteServiceAdmin->exportService = remoteServiceAdmin_exportService; + + remoteServiceAdmin->getExportedServices = remoteServiceAdmin_getExportedServices; + remoteServiceAdmin->getImportedEndpoints = remoteServiceAdmin_getImportedEndpoints; + remoteServiceAdmin->importService = remoteServiceAdmin_importService; + + remoteServiceAdmin->exportReference_getExportedEndpoint = exportReference_getExportedEndpoint; + remoteServiceAdmin->exportReference_getExportedService = exportReference_getExportedService; + + remoteServiceAdmin->exportRegistration_close = exportRegistration_close; + remoteServiceAdmin->exportRegistration_getException = exportRegistration_getException; + remoteServiceAdmin->exportRegistration_getExportReference = exportRegistration_getExportReference; + + remoteServiceAdmin->importReference_getImportedEndpoint = importReference_getImportedEndpoint; + remoteServiceAdmin->importReference_getImportedService = importReference_getImportedService; + + remoteServiceAdmin->importRegistration_close = remoteServiceAdmin_removeImportedService; + remoteServiceAdmin->importRegistration_getException = importRegistration_getException; + remoteServiceAdmin->importRegistration_getImportReference = importRegistration_getImportReference; + + status = bundleContext_registerService(context, OSGI_RSA_REMOTE_SERVICE_ADMIN, remoteServiceAdmin, NULL, &activator->registration); + activator->adminService = remoteServiceAdmin; + } + } + + return status; } celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) { celix_status_t status = CELIX_SUCCESS; struct activator *activator = userData; - remoteServiceAdmin_stop(activator->admin); serviceRegistration_unregister(activator->registration); activator->registration = NULL; + remoteServiceAdmin_stop(activator->admin); + remoteServiceAdmin_destroy(&activator->admin); free(activator->adminService); @@ -111,11 +112,12 @@ celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) } celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt context) { - celix_status_t status = CELIX_SUCCESS; - struct activator *activator = userData; + celix_status_t status = CELIX_SUCCESS; + struct activator *activator = userData; - free(activator); + free(activator); - return status; + return status; } +