celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bpe...@apache.org
Subject svn commit: r1658311 - in /celix/trunk/remote_services/discovery_shm: ./ private/include/ private/src/
Date Mon, 09 Feb 2015 06:42:21 GMT
Author: bpetri
Date: Mon Feb  9 06:42:21 2015
New Revision: 1658311

URL: http://svn.apache.org/r1658311
Log:

CELIX-217, CELIX-119: removed apr, integrated discovery-server


Added:
    celix/trunk/remote_services/discovery_shm/private/include/discovery_impl.h
    celix/trunk/remote_services/discovery_shm/private/include/shm.h
    celix/trunk/remote_services/discovery_shm/private/include/shm_watcher.h
    celix/trunk/remote_services/discovery_shm/private/src/shm.c
    celix/trunk/remote_services/discovery_shm/private/src/shm_watcher.c
Removed:
    celix/trunk/remote_services/discovery_shm/private/src/discovery.c
    celix/trunk/remote_services/discovery_shm/private/src/discovery_activator.c
Modified:
    celix/trunk/remote_services/discovery_shm/CMakeLists.txt

Modified: celix/trunk/remote_services/discovery_shm/CMakeLists.txt
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_shm/CMakeLists.txt?rev=1658311&r1=1658310&r2=1658311&view=diff
==============================================================================
--- celix/trunk/remote_services/discovery_shm/CMakeLists.txt (original)
+++ celix/trunk/remote_services/discovery_shm/CMakeLists.txt Mon Feb  9 06:42:21 2015
@@ -15,27 +15,45 @@
 # specific language governing permissions and limitations
 # under the License.
 
-celix_subproject(RSA_BUNDLES_DISCOVERY_SHM "Option to enable building the Discovery (SHM)
bundle" OFF DEPS LAUNCHER topology_manager remote_service_admin)
+celix_subproject(RSA_BUNDLES_DISCOVERY_SHM "Option to enable building the Discovery (SHM)
bundle" ON DEPS LAUNCHER topology_manager remote_service_admin)
 if (RSA_BUNDLES_DISCOVERY_SHM)
+find_package(CURL REQUIRED)
+	find_package(LibXml2 REQUIRED)
+	find_package(Jansson REQUIRED)
+	
+	include_directories("${CURL_INCLUDE_DIR}")
+	include_directories("${JANSSON_INCLUDE_DIR}")
+	include_directories("${LIBXML2_INCLUDE_DIR}")
 	include_directories("${PROJECT_SOURCE_DIR}/utils/public/include")
+	include_directories("${PROJECT_SOURCE_DIR}/remote_services/utils/private/include")
 	include_directories("${PROJECT_SOURCE_DIR}/remote_services/utils/public/include")
+	include_directories("${PROJECT_SOURCE_DIR}/remote_services/discovery/private/include")
 	include_directories("${PROJECT_SOURCE_DIR}/remote_services/discovery_shm/private/include")
 	include_directories("${PROJECT_SOURCE_DIR}/remote_services/endpoint_listener/public/include")
 	include_directories("${PROJECT_SOURCE_DIR}/remote_services/remote_service_admin/public/include")
-	include_directories("${PROJECT_SOURCE_DIR}/remote_services/remote_service_admin_shm/public/include")
+	include_directories("${PROJECT_SOURCE_DIR}/log_service/public/include")
 	include_directories("private/include")
 
 	SET_HEADER(BUNDLE_SYMBOLICNAME "apache_celix_rsa_discovery_shm")
-	SET(BUNDLE_VERSION "0.0.1")
 	SET_HEADERS("Bundle-Name: Apache Celix RSA Discovery SHM")
 
 	bundle(discovery_shm SOURCES 
-		private/src/discovery 
-		private/src/discovery_activator
+		private/src/shm
+		private/src/shm_watcher
+		private/src/discovery_impl
+		${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/discovery_activator.c
+		${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/discovery.c
+		${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/endpoint_descriptor_reader.c
+		${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/endpoint_descriptor_writer.c
+		${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/endpoint_discovery_poller.c
+		${PROJECT_SOURCE_DIR}/remote_services/discovery/private/src/endpoint_discovery_server.c
+	    ${PROJECT_SOURCE_DIR}/remote_services/remote_service_admin/private/src/endpoint_description.c
+	    ${PROJECT_SOURCE_DIR}/remote_services/utils/private/src/civetweb.c
+	    ${PROJECT_SOURCE_DIR}/log_service/public/src/log_helper.c
 	)
 
 	install_bundle(discovery_shm)
 		
-	target_link_libraries(discovery_shm celix_framework ${APRUTIL_LIBRARY})
+	target_link_libraries(discovery_shm celix_framework ${CURL_LIBRARIES} ${LIBXML2_LIBRARIES}
${JANSSON_LIBRARIES})
 
 endif (RSA_BUNDLES_DISCOVERY_SHM)

Added: celix/trunk/remote_services/discovery_shm/private/include/discovery_impl.h
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_shm/private/include/discovery_impl.h?rev=1658311&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery_shm/private/include/discovery_impl.h (added)
+++ celix/trunk/remote_services/discovery_shm/private/include/discovery_impl.h Mon Feb  9
06:42:21 2015
@@ -0,0 +1,66 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * discovery_impl.h
+ *
+ *  \date       Oct 01, 2014
+ *  \author    	<a href="mailto:celix-dev@celix.apache.org">Apache Celix Project Team</a>
+ *  \copyright	Apache License, Version 2.0
+ */
+
+#ifndef DISCOVERY_IMPL_H_
+#define DISCOVERY_IMPL_H_
+
+#include "bundle_context.h"
+#include "service_reference.h"
+
+#include "endpoint_description.h"
+#include "endpoint_listener.h"
+
+#include "endpoint_discovery_poller.h"
+#include "endpoint_discovery_server.h"
+#include "shm_watcher.h"
+
+
+#define DEFAULT_SERVER_IP   "127.0.0.1"
+#define DEFAULT_SERVER_PORT "9999"
+#define DEFAULT_SERVER_PATH "/org.apache.celix.discovery.shm"
+#define DEFAULT_POLL_ENDPOINTS "http://localhost:9999/org.apache.celix.discovery.shm"
+
+#define MAX_ROOTNODE_LENGTH		 64
+#define MAX_LOCALNODE_LENGTH	256
+
+
+struct discovery {
+	bundle_context_pt context;
+
+	celix_thread_mutex_t listenerReferencesMutex;
+	celix_thread_mutex_t discoveredServicesMutex;
+
+	hash_map_pt listenerReferences; //key=serviceReference, value=nop
+	hash_map_pt discoveredServices; //key=endpointId (string), value=endpoint_description_pt
+
+	shm_watcher_pt watcher;
+	endpoint_discovery_poller_pt poller;
+	endpoint_discovery_server_pt server;
+
+	log_helper_pt loghelper;
+};
+
+#endif /* DISCOVERY_H_ */

Added: celix/trunk/remote_services/discovery_shm/private/include/shm.h
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_shm/private/include/shm.h?rev=1658311&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery_shm/private/include/shm.h (added)
+++ celix/trunk/remote_services/discovery_shm/private/include/shm.h Mon Feb  9 06:42:21 2015
@@ -0,0 +1,28 @@
+
+#ifndef _SHM_H_
+#define _SHM_H_
+
+#include <celix_errno.h>
+
+#define SHM_ENTRY_MAX_KEY_LENGTH	256
+#define SHM_ENTRY_MAX_VALUE_LENGTH	256
+
+// defines the time-to-live in seconds
+#define SHM_ENTRY_DEFAULT_TTL		60
+
+// we currently support 64 separate discovery instances
+#define SHM_DATA_MAX_ENTRIES		64
+
+typedef struct shmData* shmData_pt;
+
+/* creates a new shared memory block */
+celix_status_t discovery_shmCreate(shmData_pt* data);
+celix_status_t discovery_shmAttach(shmData_pt* data);
+celix_status_t discovery_shmSet(shmData_pt data, char *key, char* value);
+celix_status_t discovery_shmGet(shmData_pt data, char* key, char* value);
+celix_status_t discovery_shmGetKeys(shmData_pt data, char** keys, int* size);
+celix_status_t discovery_shmRemove(shmData_pt data, char* key);
+celix_status_t discovery_shmDetach(shmData_pt data);
+celix_status_t discovery_shmDestroy(shmData_pt data);
+
+#endif

Added: celix/trunk/remote_services/discovery_shm/private/include/shm_watcher.h
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_shm/private/include/shm_watcher.h?rev=1658311&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery_shm/private/include/shm_watcher.h (added)
+++ celix/trunk/remote_services/discovery_shm/private/include/shm_watcher.h Mon Feb  9 06:42:21
2015
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * shm_watcher.h
+ *
+ * \date       30 Sep 2014
+ * \author     <a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project
Team</a>
+ * \copyright  Apache License, Version 2.0
+ */
+
+#ifndef SHM_WATCHER_H_
+#define SHM_WATCHER_H_
+
+#include "celix_errno.h"
+#include "discovery.h"
+#include "endpoint_discovery_poller.h"
+
+typedef struct shm_watcher *shm_watcher_pt;
+
+celix_status_t shmWatcher_create(endpoint_discovery_poller_pt poller, bundle_context_pt context,
shm_watcher_pt *watcher);
+celix_status_t shmWatcher_destroy(shm_watcher_pt watcher);
+
+
+#endif /* SHM_WATCHER_H_ */

Added: celix/trunk/remote_services/discovery_shm/private/src/shm.c
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_shm/private/src/shm.c?rev=1658311&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery_shm/private/src/shm.c (added)
+++ celix/trunk/remote_services/discovery_shm/private/src/shm.c Mon Feb  9 06:42:21 2015
@@ -0,0 +1,236 @@
+#include <celix_errno.h>
+#include <celix_log.h>
+
+#include <pthread.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+
+#include <sys/types.h>
+#include <sys/sem.h>
+#include <sys/shm.h>
+
+#include "shm.h"
+
+// TODO: move somewhere else
+#define DISCOVERY_SHM_MEMSIZE 262144
+#define DISCOVERY_SHM_FILENAME "/dev/null"
+#define DISCOVERY_SHM_FTOK_ID 50
+#define DISCOVERY_SEM_FILENAME "/dev/null"
+#define DISCOVERY_SEM_FTOK_ID 54
+
+struct shmEntry {
+    char key[SHM_ENTRY_MAX_KEY_LENGTH];
+    char value[SHM_ENTRY_MAX_VALUE_LENGTH];
+
+    time_t expires;
+};
+
+typedef struct shmEntry shmEntry;
+
+struct shmData {
+    shmEntry entries[SHM_DATA_MAX_ENTRIES];
+    int numOfEntries;
+    int shmId;
+
+    pthread_mutex_t globalLock;
+};
+
+void* shmAdress;
+
+static celix_status_t discovery_shmRemoveWithIndex(shmData_pt data, int index);
+
+/* returns the ftok key to identify shared memory*/
+static key_t getShmKey() {
+    return ftok(DISCOVERY_SHM_FILENAME, DISCOVERY_SHM_FTOK_ID);
+}
+
+/* creates a new shared memory block */
+celix_status_t discovery_shmCreate(shmData_pt* data) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    shmData_pt shmData = calloc(1, sizeof(struct shmData));
+
+    key_t shmKey = getShmKey();
+
+    if (!shmData) {
+        status = CELIX_ENOMEM;
+    } else if ((shmData->shmId = shmget(shmKey, DISCOVERY_SHM_MEMSIZE, IPC_CREAT | 0666))
< 0) {
+        fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "Creation of shared memory segment failed.");
+        status = CELIX_BUNDLE_EXCEPTION;
+    } else if ((shmAdress = shmat(shmData->shmId, 0, 0)) == (char*) -1) {
+        fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "Attaching of shared memory segment failed.");
+        status = CELIX_BUNDLE_EXCEPTION;
+    } else {
+        shmData->numOfEntries = 0;
+
+        pthread_mutexattr_t threadAttr;
+        if (pthread_mutexattr_init(&threadAttr) != 0)
+            printf("Error while initalizing lock attributes\n");
+        else
+            pthread_mutexattr_setrobust(&threadAttr, PTHREAD_MUTEX_ROBUST);
+
+        if (pthread_mutex_init(&shmData->globalLock, &threadAttr) == 0)
+            printf("Global lock sucessfully initialized\n");
+        else {
+            printf("Global lock init failed\n");
+            exit(1);
+        }
+        memcpy(shmAdress, shmData, sizeof(struct shmData));
+
+        (*data) = shmAdress;
+    }
+    free(shmData);
+
+    return status;
+}
+
+celix_status_t discovery_shmAttach(shmData_pt* data) {
+    celix_status_t status = CELIX_SUCCESS;
+    key_t shmKey = ftok(DISCOVERY_SHM_FILENAME, DISCOVERY_SHM_FTOK_ID);
+    int shmId = -1;
+
+    if ((shmId = shmget(shmKey, DISCOVERY_SHM_MEMSIZE, 0666)) < 0) {
+        fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "DISCOVERY : Attaching of shared memory
segment failed.");
+        status = CELIX_BUNDLE_EXCEPTION;
+    }
+
+    if (((*data) = shmat(shmId, 0, 0)) < 0) {
+        fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "DISCOVERY : Attaching of shared memory
segment failed.");
+        status = CELIX_BUNDLE_EXCEPTION;
+    }
+
+    return status;
+}
+
+static celix_status_t discovery_shmGetwithIndex(shmData_pt data, char* key, char* value,
int* index) {
+    celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+    time_t currentTime = time(NULL);
+    unsigned int i;
+
+    for (i = 0; i < data->numOfEntries && status != CELIX_SUCCESS; i++) {
+        shmEntry entry = data->entries[i];
+        // check if entry is still valid
+        if (data->entries[i].expires < currentTime) {
+            discovery_shmRemoveWithIndex(data, i);
+        } else if (strcmp(entry.key, key) == 0) {
+            if (value) {
+                strcpy(value, entry.value);
+            }
+            if (index) {
+                (*index) = i;
+            }
+            status = CELIX_SUCCESS;
+        }
+    }
+
+    return status;
+}
+
+celix_status_t discovery_shmGetKeys(shmData_pt data, char** keys, int* size) {
+    celix_status_t status = CELIX_SUCCESS;
+    unsigned int i = 0;
+
+    pthread_mutex_lock(&data->globalLock);
+
+    for (i = 0; i < data->numOfEntries; i++) {
+        shmEntry entry = data->entries[i];
+
+        if (entry.key) {
+            snprintf(keys[i], SHM_ENTRY_MAX_KEY_LENGTH, "%s", entry.key);
+        }
+    }
+
+    (*size) = i;
+
+    pthread_mutex_unlock(&data->globalLock);
+    return status;
+}
+
+celix_status_t discovery_shmSet(shmData_pt data, char *key, char* value) {
+    celix_status_t status = CELIX_SUCCESS;
+    int index = -1;
+
+    if (data->numOfEntries >= SHM_DATA_MAX_ENTRIES) {
+        status = CELIX_ILLEGAL_STATE;
+    } else {
+        pthread_mutex_lock(&data->globalLock);
+
+        // check if key already there
+        if (discovery_shmGetwithIndex(data, key, NULL, &index) != CELIX_SUCCESS) {
+            index = data->numOfEntries;
+
+            snprintf(data->entries[index].key, SHM_ENTRY_MAX_KEY_LENGTH, "%s", key);
+            data->numOfEntries++;
+        }
+
+        snprintf(data->entries[index].value, SHM_ENTRY_MAX_VALUE_LENGTH, "%s", value);
+        data->entries[index].expires = (time(NULL) + SHM_ENTRY_DEFAULT_TTL);
+
+        pthread_mutex_unlock(&data->globalLock);
+    }
+
+    return status;
+}
+
+celix_status_t discovery_shmGet(shmData_pt data, char* key, char* value) {
+    celix_status_t status = CELIX_ILLEGAL_ARGUMENT;
+
+    pthread_mutex_lock(&data->globalLock);
+
+    status = discovery_shmGetwithIndex(data, key, value, NULL);
+
+    pthread_mutex_unlock(&data->globalLock);
+
+    return status;
+}
+
+static celix_status_t discovery_shmRemoveWithIndex(shmData_pt data, int index) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    data->numOfEntries--;
+    if (index < data->numOfEntries) {
+        memcpy((void*) &data->entries[index], (void*) &data->entries[index
+ 1], ((data->numOfEntries - index) * sizeof(struct shmEntry)));
+    }
+
+    return status;
+}
+
+celix_status_t discovery_shmRemove(shmData_pt data, char* key) {
+    celix_status_t status = CELIX_SUCCESS;
+    int index = -1;
+
+    pthread_mutex_lock(&data->globalLock);
+
+    if ((status = discovery_shmGetwithIndex(data, key, NULL, &index)) == CELIX_SUCCESS)
{
+
+        discovery_shmRemoveWithIndex(data, index);
+    }
+    pthread_mutex_unlock(&data->globalLock);
+
+    return status;
+
+}
+
+celix_status_t discovery_shmDetach(shmData_pt data) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    if (data->numOfEntries == 0)
+        discovery_shmDestroy(data);
+    else {
+        fw_log(logger, OSGI_FRAMEWORK_LOG_DEBUG, "DISCOVERY_SHM: Detaching from Shared memory\n");
+        shmdt(shmAdress);
+    }
+
+    return status;
+}
+
+celix_status_t discovery_shmDestroy(shmData_pt data) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    fw_log(logger, OSGI_FRAMEWORK_LOG_DEBUG, "DISCOVERY_SHM: Destroying Shared memory.");
+    shmctl(data->shmId, IPC_RMID, 0);
+
+    return status;
+
+}

Added: celix/trunk/remote_services/discovery_shm/private/src/shm_watcher.c
URL: http://svn.apache.org/viewvc/celix/trunk/remote_services/discovery_shm/private/src/shm_watcher.c?rev=1658311&view=auto
==============================================================================
--- celix/trunk/remote_services/discovery_shm/private/src/shm_watcher.c (added)
+++ celix/trunk/remote_services/discovery_shm/private/src/shm_watcher.c Mon Feb  9 06:42:21
2015
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * endpoint_discovery_watcher.c
+ *
+ * \date       16 Sep 2014
+ * \author     <a href="mailto:celix-dev@incubator.apache.org">Apache Celix Project
Team</a>
+ * \copyright  Apache License, Version 2.0
+ */
+
+#include <stdbool.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include "celix_log.h"
+#include "constants.h"
+#include "discovery_impl.h"
+
+#include "shm.h"
+#include "shm_watcher.h"
+
+#include "endpoint_discovery_poller.h"
+
+struct shm_watcher {
+    endpoint_discovery_poller_pt poller;
+    bundle_context_pt context;
+
+    shmData_pt shmData;
+    celix_thread_t watcherThread;
+    celix_thread_mutex_t watcherLock;
+
+    volatile bool running;
+};
+
+// note that the rootNode shouldn't have a leading slash
+static celix_status_t shmWatcher_getRootPath(char* rootNode) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    strcpy(rootNode, "discovery");
+
+    return status;
+}
+
+static celix_status_t shmWatcher_getLocalNodePath(bundle_context_pt context, char* localNodePath)
{
+    celix_status_t status = CELIX_SUCCESS;
+    char rootPath[MAX_ROOTNODE_LENGTH];
+    char* uuid = NULL;
+
+    if (shmWatcher_getRootPath(&rootPath[0]) != CELIX_SUCCESS) {
+        status = CELIX_ILLEGAL_STATE;
+    } else if (((bundleContext_getProperty(context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &uuid))
!= CELIX_SUCCESS) || (!uuid)) {
+        status = CELIX_ILLEGAL_STATE;
+    } else if (rootPath[strlen(&rootPath[0]) - 1] == '/') {
+        snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s%s", &rootPath[0], uuid);
+    } else {
+        snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s/%s", &rootPath[0], uuid);
+    }
+
+    return status;
+}
+
+/* retrieves all endpoints from shm and syncs them with the ones already available */
+static celix_status_t shmWatcher_syncEndpoints(shm_watcher_pt watcher) {
+    celix_status_t status = CELIX_SUCCESS;
+    char** shmKeyArr = calloc(SHM_DATA_MAX_ENTRIES, sizeof(*shmKeyArr));
+    array_list_pt registeredKeyArr = NULL; //calloc(SHM_DATA_MAX_ENTRIES, sizeof(*registeredKeyArr));
+
+    int i, j, shmSize;
+
+    for (i = 0; i < SHM_DATA_MAX_ENTRIES; i++) {
+        shmKeyArr[i] = calloc(SHM_ENTRY_MAX_KEY_LENGTH, sizeof(*shmKeyArr[i]));
+    }
+
+    arrayList_create(&registeredKeyArr);
+
+    // get all urls available in shm
+    discovery_shmGetKeys(watcher->shmData, shmKeyArr, &shmSize);
+
+    // get all locally registered endpoints
+    endpointDiscoveryPoller_getDiscoveryEndpoints(watcher->poller, registeredKeyArr);
+
+    // add discovery points which are in shm, but not local yet
+    for (i = 0; i < shmSize; i++) {
+        char url[SHM_ENTRY_MAX_VALUE_LENGTH];
+        bool elementFound = false;
+
+        if (discovery_shmGet(watcher->shmData, shmKeyArr[i], &url[0]) == CELIX_SUCCESS)
{
+            for (j = 0; j < arrayList_size(registeredKeyArr) && elementFound ==
false; j++) {
+
+                if (strcmp(url, (char*) arrayList_get(registeredKeyArr, j)) == 0) {
+                    free(arrayList_remove(registeredKeyArr, j));
+                    elementFound = true;
+                }
+            }
+
+            if (elementFound == false) {
+                endpointDiscoveryPoller_addDiscoveryEndpoint(watcher->poller, strdup(url));
+            }
+        }
+    }
+
+    // remove those which are not in shm
+    for (i = 0; i < arrayList_size(registeredKeyArr); i++) {
+        char* regUrl = arrayList_get(registeredKeyArr, i);
+
+        if (regUrl != NULL) {
+            endpointDiscoveryPoller_removeDiscoveryEndpoint(watcher->poller, regUrl);
+        }
+    }
+
+    for (i = 0; i < SHM_DATA_MAX_ENTRIES; i++) {
+        free(shmKeyArr[i]);
+    }
+
+    free(shmKeyArr);
+
+    for (j = 0; j < arrayList_size(registeredKeyArr); j++) {
+        free(arrayList_get(registeredKeyArr, j));
+    }
+
+    arrayList_destroy(registeredKeyArr);
+
+    return status;
+}
+
+static void* shmWatcher_run(void* data) {
+    shm_watcher_pt watcher = (shm_watcher_pt) data;
+    char localNodePath[MAX_LOCALNODE_LENGTH];
+    char* endpoints = NULL;
+
+    if (shmWatcher_getLocalNodePath(watcher->context, &localNodePath[0]) != CELIX_SUCCESS)
{
+        fw_log(logger, OSGI_FRAMEWORK_LOG_WARNING, "Cannot register local discovery");
+    }
+
+    if ((bundleContext_getProperty(watcher->context, DISCOVERY_POLL_ENDPOINTS, &endpoints)
!= CELIX_SUCCESS) || !endpoints) {
+        endpoints = DEFAULT_POLL_ENDPOINTS;
+    }
+
+    while (watcher->running) {
+        // register own framework
+        if (discovery_shmSet(watcher->shmData, localNodePath, endpoints) != CELIX_SUCCESS)
{
+            fw_log(logger, OSGI_FRAMEWORK_LOG_WARNING, "Cannot register local discovery");
+        }
+
+        shmWatcher_syncEndpoints(watcher);
+        sleep(5);
+    }
+
+    return NULL;
+}
+
+celix_status_t shmWatcher_create(endpoint_discovery_poller_pt poller, bundle_context_pt context,
shm_watcher_pt *watcher) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    if (poller == NULL) {
+        return CELIX_BUNDLE_EXCEPTION;
+    }
+
+    (*watcher) = calloc(1, sizeof(struct shm_watcher));
+    if (!watcher) {
+        return CELIX_ENOMEM;
+    } else {
+        (*watcher)->poller = poller;
+        (*watcher)->context = context;
+        if (discovery_shmAttach(&((*watcher)->shmData)) != CELIX_SUCCESS)
+            discovery_shmCreate(&((*watcher)->shmData));
+
+    }
+
+    if ((status = celixThreadMutex_create(&(*watcher)->watcherLock, NULL)) != CELIX_SUCCESS)
{
+        return status;
+    }
+
+    if ((status = celixThreadMutex_lock(&(*watcher)->watcherLock)) != CELIX_SUCCESS)
{
+        return status;
+    }
+
+    if ((status = celixThread_create(&(*watcher)->watcherThread, NULL, shmWatcher_run,
*watcher)) != CELIX_SUCCESS) {
+        return status;
+    }
+
+    (*watcher)->running = true;
+
+    if ((status = celixThreadMutex_unlock(&(*watcher)->watcherLock)) != CELIX_SUCCESS)
{
+        return status;
+    }
+
+    return status;
+}
+
+celix_status_t shmWatcher_destroy(shm_watcher_pt watcher) {
+    celix_status_t status = CELIX_SUCCESS;
+    char localNodePath[MAX_LOCALNODE_LENGTH];
+
+    watcher->running = false;
+
+    celixThread_join(watcher->watcherThread, NULL);
+
+    // register own framework
+    if ((status = shmWatcher_getLocalNodePath(watcher->context, &localNodePath[0]))
!= CELIX_SUCCESS) {
+        return status;
+    }
+
+    if (discovery_shmRemove(watcher->shmData, localNodePath) != CELIX_SUCCESS) {
+        fw_log(logger, OSGI_FRAMEWORK_LOG_WARNING, "Cannot remove local discovery registration.");
+    }
+
+    discovery_shmDetach(watcher->shmData);
+    free(watcher);
+
+    return status;
+}
+



Mime
View raw message