Return-Path: X-Original-To: apmail-incubator-celix-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-celix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2C141110DD for ; Mon, 9 Jun 2014 09:15:42 +0000 (UTC) Received: (qmail 67549 invoked by uid 500); 9 Jun 2014 09:15:41 -0000 Delivered-To: apmail-incubator-celix-commits-archive@incubator.apache.org Received: (qmail 67508 invoked by uid 500); 9 Jun 2014 09:15:41 -0000 Mailing-List: contact celix-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: celix-dev@incubator.apache.org Delivered-To: mailing list celix-commits@incubator.apache.org Received: (qmail 67479 invoked by uid 99); 9 Jun 2014 09:15:41 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Jun 2014 09:15:41 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Jun 2014 09:15:38 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id C60602388831; Mon, 9 Jun 2014 09:15:12 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1601328 - in /incubator/celix/trunk/remote_services/discovery_shm/private: include/discovery.h src/discovery.c Date: Mon, 09 Jun 2014 09:15:12 -0000 To: celix-commits@incubator.apache.org From: bpetri@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140609091512.C60602388831@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: bpetri Date: Mon Jun 9 09:15:11 2014 New Revision: 1601328 URL: http://svn.apache.org/r1601328 Log: CELIX-120: changed structure of shared data. The data itself includes now also the framework uuid, services are saved per framework. Modified: incubator/celix/trunk/remote_services/discovery_shm/private/include/discovery.h incubator/celix/trunk/remote_services/discovery_shm/private/src/discovery.c Modified: incubator/celix/trunk/remote_services/discovery_shm/private/include/discovery.h URL: http://svn.apache.org/viewvc/incubator/celix/trunk/remote_services/discovery_shm/private/include/discovery.h?rev=1601328&r1=1601327&r2=1601328&view=diff ============================================================================== --- incubator/celix/trunk/remote_services/discovery_shm/private/include/discovery.h (original) +++ incubator/celix/trunk/remote_services/discovery_shm/private/include/discovery.h Mon Jun 9 09:15:11 2014 @@ -33,6 +33,12 @@ #define DISCOVERY_SEM_FILENAME "/dev/null" #define DISCOVERY_SEM_FTOK_ID 52 +#define DISCOVERY_SHM_FW_SERVICES "fw.services" +#define DISCOVERY_SHM_SRVC_PROPERTIES "srvc.props" + + +#include +#include #include "endpoint_listener.h" @@ -43,6 +49,24 @@ struct ipc_shmData char data[DISCOVERY_SHM_MEMSIZE - (2* (sizeof(int) + sizeof(key_t)))]; }; +struct discovery { + bundle_context_pt context; + apr_pool_t *pool; + + hash_map_pt listenerReferences; + + bool running; + + int shmId; + void *shmBaseAdress; + apr_thread_t *shmPollThread; + + hash_map_pt shmServices; + + array_list_pt registered; +}; + + typedef struct discovery *discovery_pt; typedef struct ipc_shmData *ipc_shmData_pt; Modified: incubator/celix/trunk/remote_services/discovery_shm/private/src/discovery.c URL: http://svn.apache.org/viewvc/incubator/celix/trunk/remote_services/discovery_shm/private/src/discovery.c?rev=1601328&r1=1601327&r2=1601328&view=diff ============================================================================== --- incubator/celix/trunk/remote_services/discovery_shm/private/src/discovery.c (original) +++ incubator/celix/trunk/remote_services/discovery_shm/private/src/discovery.c Mon Jun 9 09:15:11 2014 @@ -1,5 +1,4 @@ - /** *Licensed to the Apache Software Foundation (ASF) under one *or more contributor license agreements. See the NOTICE file @@ -37,6 +36,7 @@ #include #include "bundle_context.h" +#include "constants.h" #include "array_list.h" #include "utils.h" #include "celix_errno.h" @@ -47,24 +47,6 @@ #include "discovery.h" -struct discovery { - bundle_context_pt context; - apr_pool_t *pool; - - hash_map_pt listenerReferences; - - bool running; - - int shmId; - void *shmBaseAdress; - apr_thread_t *shmPollThread; - - hash_map_pt shmServices; - - array_list_pt handled; - array_list_pt registered; -}; - celix_status_t discovery_informListener(discovery_pt discovery, endpoint_listener_pt listener, endpoint_description_pt endpoint); celix_status_t discovery_informListenerOfRemoval(discovery_pt discovery, endpoint_listener_pt listener, endpoint_description_pt endpoint); @@ -74,13 +56,14 @@ celix_status_t discovery_removeService(d static void *APR_THREAD_FUNC discovery_pollSHMServices(apr_thread_t *thd, void *data); celix_status_t discovery_lock(int semId, int semNr); celix_status_t discovery_unlock(int semId, int semNr); -celix_status_t discovery_wait(int semId, int semNr); +celix_status_t discovery_broadcast(int semId, int semNr); +celix_status_t discovery_stillAlive(char* pid, bool* stillAlive); celix_status_t discovery_updateLocalSHMServices(discovery_pt discovery); -celix_status_t discovery_updateSHMServices(discovery_pt discovery, char *serviceName, char *nsEncAttributes); +celix_status_t discovery_updateSHMServices(discovery_pt discovery, endpoint_description_pt endpoint, bool addService); -celix_status_t discovery_registerSHMService(discovery_pt discovery, char *url, char *attributes); -celix_status_t discovery_deregisterSHMService(discovery_pt discovery, char *serviceName); +celix_status_t discovery_registerSHMService(discovery_pt discovery, endpoint_description_pt endpoint); +celix_status_t discovery_deregisterSHMService(discovery_pt discovery, endpoint_description_pt endpoint); celix_status_t discovery_createOrAttachShm(discovery_pt discovery); celix_status_t discovery_stopOrDetachShm(discovery_pt discovery); @@ -99,8 +82,6 @@ celix_status_t discovery_create(apr_pool (*discovery)->shmServices = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); (*discovery)->running = true; - (*discovery)->handled = NULL; - arrayList_create(&(*discovery)->handled); (*discovery)->registered = NULL; arrayList_create(&(*discovery)->registered); @@ -109,7 +90,7 @@ celix_status_t discovery_create(apr_pool if ((status = discovery_createOrAttachShm(*discovery)) != CELIX_SUCCESS) { - printf("DISCOVERY: Shared Memory initialization failed.\n"); + printf("DISCOVERY: Shared Memory initialization failed."); } else { @@ -120,9 +101,18 @@ celix_status_t discovery_create(apr_pool return status; } -celix_status_t discovery_stop(discovery_pt discovery) { +celix_status_t discovery_destroy(discovery_pt discovery) { celix_status_t status = CELIX_SUCCESS; + arrayList_destroy(discovery->registered); + hashMap_destroy(discovery->shmServices, false, false); + hashMap_destroy(discovery->listenerReferences, false, false); + + return status; +} + +celix_status_t discovery_stop(discovery_pt discovery) { + celix_status_t status = CELIX_SUCCESS; apr_status_t tstat; discovery->running = false; @@ -130,14 +120,12 @@ celix_status_t discovery_stop(discovery_ if (shmData != NULL) { - discovery_unlock(shmData->semId, 1); - discovery_wait(shmData->semId, 2); - discovery_lock(shmData->semId, 1); + discovery_broadcast(shmData->semId, 1); apr_status_t stat = apr_thread_join(&tstat, discovery->shmPollThread); if (stat != APR_SUCCESS && tstat != APR_SUCCESS) - { + { printf("DISCOVERY: An error occured while stopping the SHM polling thread.\n"); status = CELIX_BUNDLE_EXCEPTION; } @@ -146,16 +134,14 @@ celix_status_t discovery_stop(discovery_ printf("DISCOVERY: SHM polling thread sucessfully stopped.\n"); int i; for (i = 0; i < arrayList_size(discovery->registered); i++) { - char *serviceName = arrayList_get(discovery->registered, i); - printf("DISCOVERY: deregistering service %s.\n", serviceName); - status = discovery_deregisterSHMService(discovery, serviceName); - } - //discovery_lock(shmData->semId, 1); + endpoint_description_pt endpoint = (endpoint_description_pt) arrayList_get(discovery->registered, i); + printf("DISCOVERY: deregistering service %s.\n", endpoint->service); + status = discovery_deregisterSHMService(discovery, endpoint); + } // detach from shm status = discovery_stopOrDetachShm(discovery); - } } @@ -172,9 +158,10 @@ celix_status_t discovery_removeService(d hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); service_reference_pt reference = hashMapEntry_getKey(entry); endpoint_listener_pt listener = NULL; - bundleContext_getService(discovery->context, reference, (void**)&listener); + bundleContext_getService(discovery->context, reference, (void**) &listener); discovery_informListenerOfRemoval(discovery, listener, endpoint); } + hashMapIterator_destroy(iter); return status; } @@ -187,7 +174,6 @@ celix_status_t discovery_addService(disc printf("DISCOVERY: Add service (%s)\n", endpoint->service); - while (hashMapIterator_hasNext(iter)) { hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); service_reference_pt reference = hashMapEntry_getKey(entry); @@ -203,16 +189,15 @@ celix_status_t discovery_addService(disc filter_match(filter, endpoint->properties, &matchResult); if (matchResult) { printf("DISCOVERY: Add service (%s)\n", endpoint->service); - bundleContext_getService(discovery->context, reference, (void**)&listener); + bundleContext_getService(discovery->context, reference, (void**) &listener); discovery_informListener(discovery, listener, endpoint); } } + hashMapIterator_destroy(iter); return status; } - - celix_status_t discovery_informListener(discovery_pt discovery, endpoint_listener_pt listener, endpoint_description_pt endpoint) { celix_status_t status = CELIX_SUCCESS; @@ -226,16 +211,15 @@ celix_status_t discovery_informListenerO return status; } - celix_status_t discovery_lock(int semId, int semNr) { celix_status_t status = CELIX_SUCCESS; int semOpStatus = 0; struct sembuf semOperation; - semOperation.sem_num=semNr; - semOperation.sem_op=-1; - semOperation.sem_flg=0; + semOperation.sem_num = semNr; + semOperation.sem_op = -1; + semOperation.sem_flg = 0; do { @@ -245,21 +229,20 @@ celix_status_t discovery_lock(int semId, { status = CELIX_BUNDLE_EXCEPTION; } - } while(semOpStatus == -1 && errno == EINTR); + } while (semOpStatus == -1 && errno == EINTR); return status; } - celix_status_t discovery_unlock(int semId, int semNr) { celix_status_t status = CELIX_SUCCESS; int semOpStatus = 0; struct sembuf semOperation; - semOperation.sem_num=semNr; - semOperation.sem_op=1; - semOperation.sem_flg=0; + semOperation.sem_num = semNr; + semOperation.sem_op = 1; + semOperation.sem_flg = 0; do { @@ -269,77 +252,336 @@ celix_status_t discovery_unlock(int semI { status = CELIX_BUNDLE_EXCEPTION; } - } while(semOpStatus == -1 && errno == EINTR); + } while (semOpStatus == -1 && errno == EINTR); + + return status; +} + +celix_status_t discovery_broadcast(int semId, int semNr) +{ + celix_status_t status = CELIX_SUCCESS; + int semOpStatus = 0; + struct sembuf semOperation; + + semOperation.sem_num = semNr; + semOperation.sem_op = semctl(semId, semNr, GETNCNT, 0) + 1; /* + 1 cause we also want to include out own process */ + semOperation.sem_flg = 0; + + do + { + status = CELIX_SUCCESS; + if ((semOpStatus = semop(semId, &semOperation, 1)) != 0) + { + status = CELIX_BUNDLE_EXCEPTION; + } + } while (semOpStatus == -1 && errno == EINTR); return status; } +celix_status_t discovery_decShmMapService(discovery_pt discovery, char* encServiceMap, hash_map_pt outShmMap) +{ + celix_status_t status = CELIX_SUCCESS; + + if ((status = netstring_decodeToHashMap(discovery->pool, encServiceMap, outShmMap)) != CELIX_SUCCESS) + { + printf("DISCOVERY: discovery_decShmMapService : decoding service data to hashmap failed\n"); + } + else + { + // decode service properties as well + char* encServiceProps = hashMap_get(outShmMap, DISCOVERY_SHM_SRVC_PROPERTIES); + hash_map_pt props = hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals, utils_stringEquals); + + if ((status = netstring_decodeToHashMap(discovery->pool, encServiceProps, props)) != CELIX_SUCCESS) + { + printf("DISCOVERY: discovery_decShmMapService : Decoding of endpointProperties failed\n"); + } + + hashMap_put(outShmMap, DISCOVERY_SHM_SRVC_PROPERTIES, props); + } + + return status; +} -celix_status_t discovery_wait(int semId, int semNr) +celix_status_t discovery_encShmMapService(discovery_pt discovery, hash_map_pt inShmMap, char** outEncServiceMap) { - celix_status_t status = CELIX_SUCCESS; - int semOpStatus = 0; - struct sembuf semOperation; + celix_status_t status = CELIX_SUCCESS; - semOperation.sem_num = semNr; - semOperation.sem_op = 0; - semOperation.sem_flg = 0; + // encode service properties as well + char* encServiceProps = NULL; + hash_map_pt props = hashMap_get(inShmMap, DISCOVERY_SHM_SRVC_PROPERTIES); - do - { - status = CELIX_SUCCESS; + if ((status = netstring_encodeFromHashMap(discovery->pool, props, &encServiceProps)) != CELIX_SUCCESS) + { + printf("DISCOVERY: discovery_encShmMapService : encoding of endpointProperties failed\n"); + } + else + { + hashMap_put(inShmMap, DISCOVERY_SHM_SRVC_PROPERTIES, encServiceProps); + + if ((status = netstring_encodeFromHashMap(discovery->pool, inShmMap, outEncServiceMap)) != CELIX_SUCCESS) + { + printf("DISCOVERY: discovery_encShmMapService : encoding service data to hashmap failed\n"); + } + } - if ((semOpStatus = semop(semId, &semOperation, 1)) != 0) - { - status = CELIX_BUNDLE_EXCEPTION; - } - } while (semOpStatus == -1 && errno == EINTR); + // we can only free that if not allocated via apr + hashMap_destroy(props, false, false); - return status; + return status; } +celix_status_t discovery_decShmMapDiscoveryInstance(discovery_pt discovery, char* encDiscInstance, hash_map_pt outRegServices) +{ + celix_status_t status = CELIX_BUNDLE_EXCEPTION; + + if ((status = netstring_decodeToHashMap(discovery->pool, encDiscInstance, outRegServices)) != CELIX_SUCCESS) + { + printf("DISCOVERY: discovery_decShmMapDiscoveryInstance : decoding data to properties failed\n"); + } + else { + + char* encServices = hashMap_get(outRegServices, DISCOVERY_SHM_FW_SERVICES); + hash_map_pt fwServices = hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals, utils_stringEquals); + + if ((status = netstring_decodeToHashMap(discovery->pool, encServices, fwServices)) != CELIX_SUCCESS) + { + printf("DISCOVERY: discovery_decShmMapDiscoveryInstance : decoding services failed\n"); + } + else + { + hash_map_iterator_pt shmItr = hashMapIterator_create(fwServices); + + while ((status == CELIX_SUCCESS) && (hashMapIterator_hasNext(shmItr) == true)) + { + hash_map_pt regShmService = hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals, utils_stringEquals); + hash_map_entry_pt shmSrvc = hashMapIterator_nextEntry(shmItr); + + char *serviceName = hashMapEntry_getKey(shmSrvc); + char *encServiceMap = hashMapEntry_getValue(shmSrvc); + + if ((status = discovery_decShmMapService(discovery, encServiceMap, regShmService)) == CELIX_SUCCESS) + { + hashMap_put(fwServices, serviceName, regShmService); + } + } + + hashMapIterator_destroy(shmItr); + + hashMap_put(outRegServices, DISCOVERY_SHM_FW_SERVICES, fwServices); + } + } + return status; +} -celix_status_t discovery_updateSHMServices(discovery_pt discovery, char *serviceName, char *nsEncAttributes) +// fwId -> hm_services +celix_status_t discovery_encShmMapDiscoveryInstance(discovery_pt discovery, hash_map_pt inFwAttr, char** outEncDiscoveryInstance) { celix_status_t status = CELIX_SUCCESS; - if ((discovery->shmId < 0) || (discovery->shmBaseAdress == NULL)) + hash_map_pt inRegServices = hashMap_get(inFwAttr, DISCOVERY_SHM_FW_SERVICES); + hash_map_iterator_pt shmItr = hashMapIterator_create(inRegServices); + + while ((status == CELIX_SUCCESS) && (hashMapIterator_hasNext(shmItr) == true)) + { + hash_map_entry_pt shmSrvc = hashMapIterator_nextEntry(shmItr); + + char *encServiceMap = NULL; + char *serviceName = hashMapEntry_getKey(shmSrvc); + + hash_map_pt regShmService = hashMapEntry_getValue(shmSrvc); + + if ((status = discovery_encShmMapService(discovery, regShmService, &encServiceMap)) == CELIX_SUCCESS) + { + hashMap_put(inRegServices, serviceName, encServiceMap); + } + + hashMap_destroy(regShmService, false, false); + } + + hashMapIterator_destroy(shmItr); + + if (status == CELIX_SUCCESS) + { + char* outEncServices = NULL; + + if ((status = netstring_encodeFromHashMap(discovery->pool, inRegServices, &outEncServices)) != CELIX_SUCCESS) + { + printf("DISCOVERY: discovery_encShmMapDiscoveryInstance : encode services failed\n"); + } + else + { + hashMap_put(inFwAttr, DISCOVERY_SHM_FW_SERVICES, outEncServices); + + if ((status = netstring_encodeFromHashMap(discovery->pool, inFwAttr, outEncDiscoveryInstance)) != CELIX_SUCCESS) + { + printf("DISCOVERY: discovery_encShmMapDiscoveryInstance : encode discovery instances failed\n"); + } + + } + } + + hashMap_destroy(inRegServices, false, false); + + return status; +} + +celix_status_t discovery_decShmMap(discovery_pt discovery, char* encMap, hash_map_pt outRegDiscInstances) +{ + celix_status_t status = CELIX_BUNDLE_EXCEPTION; + + if ((status = netstring_decodeToHashMap(discovery->pool, encMap, outRegDiscInstances)) != CELIX_SUCCESS) { + printf("DISCOVERY: discovery_updateLocalSHMServices : decoding data to properties failed\n"); + } + else + { + hash_map_iterator_pt regDiscoveryInstancesItr = hashMapIterator_create(outRegDiscInstances); + + while ((status == CELIX_SUCCESS) && (hashMapIterator_hasNext(regDiscoveryInstancesItr) == true)) + { + hash_map_pt regDiscoveryInstance = hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals, utils_stringEquals); + hash_map_entry_pt regDiscoveryEntry = hashMapIterator_nextEntry(regDiscoveryInstancesItr); + + char* fwPid = hashMapEntry_getKey(regDiscoveryEntry); + char* encDiscoveryInstance = hashMapEntry_getValue(regDiscoveryEntry); + + if ((status = discovery_decShmMapDiscoveryInstance(discovery, encDiscoveryInstance, regDiscoveryInstance)) == CELIX_SUCCESS) + { + hashMap_put(outRegDiscInstances, fwPid, regDiscoveryInstance); + } + } + + hashMapIterator_destroy(regDiscoveryInstancesItr); + + } + + return status; +} + +celix_status_t discovery_encShmMap(discovery_pt discovery, hash_map_pt inRegDiscInstances, char** outEncMap) +{ + celix_status_t status = CELIX_SUCCESS; + + hash_map_iterator_pt regDiscoveryInstancesItr = hashMapIterator_create(inRegDiscInstances); + + while ((status == CELIX_SUCCESS) && (hashMapIterator_hasNext(regDiscoveryInstancesItr) == true)) + { + hash_map_entry_pt regDiscoveryEntry = hashMapIterator_nextEntry(regDiscoveryInstancesItr); + + char* encDiscoveryInstance = NULL; + char* fwPid = hashMapEntry_getKey(regDiscoveryEntry); + hash_map_pt regDiscoveryInstance = hashMapEntry_getValue(regDiscoveryEntry); + + if ((status = discovery_encShmMapDiscoveryInstance(discovery, regDiscoveryInstance, &encDiscoveryInstance)) == CELIX_SUCCESS) + { + hashMap_put(inRegDiscInstances, fwPid, encDiscoveryInstance); + } + + hashMap_destroy(regDiscoveryInstance, false, false); + } + + hashMapIterator_destroy(regDiscoveryInstancesItr); + + if ((status == CELIX_SUCCESS) && ((status = netstring_encodeFromHashMap(discovery->pool, inRegDiscInstances, outEncMap)) != CELIX_SUCCESS)) + { + printf("DISCOVERY: discovery_encShmMapDiscoveryInstance : encode shm map failed\n"); + } + + return status; +} + +celix_status_t discovery_updateSHMServices(discovery_pt discovery, endpoint_description_pt endpoint, bool addService) +{ + celix_status_t status = CELIX_SUCCESS; + + if ((discovery->shmId < 0) || (discovery->shmBaseAdress == NULL)) + { printf("DISCOVERY : shared memory not initialized.\n"); status = CELIX_BUNDLE_EXCEPTION; } else { - ipc_shmData_pt shmData = (ipc_shmData_pt) discovery->shmBaseAdress; + ipc_shmData_pt shmData = (ipc_shmData_pt) discovery->shmBaseAdress; - if((status = discovery_lock(shmData->semId, 0)) == CELIX_SUCCESS) + if ((status = discovery_lock(shmData->semId, 0)) == CELIX_SUCCESS) { - hash_map_pt registeredShmServices = hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals, utils_stringEquals); + hash_map_pt regDiscoveryInstances = hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals, utils_stringEquals); - /* get already saved properties */ - if ((status = netstring_decodeToHashMap(discovery->pool, shmData->data, registeredShmServices)) != CELIX_SUCCESS) + if ((status = discovery_decShmMap(discovery, &(shmData->data[0]), regDiscoveryInstances)) != CELIX_SUCCESS) { printf("DISCOVERY : discovery_registerSHMService : decoding data to Properties failed\n"); } else { - char *encShmServices = NULL; + char *uuid = NULL; + bundleContext_getProperty(discovery->context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &uuid); + + hash_map_pt ownFramework = hashMap_get(regDiscoveryInstances, uuid); - if (nsEncAttributes != NULL) + if (ownFramework == NULL) { - hashMap_put(registeredShmServices, serviceName, nsEncAttributes); + ownFramework = hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals, utils_stringEquals); + hashMap_put(regDiscoveryInstances, uuid, ownFramework); + } + + hash_map_pt ownServices = hashMap_get(ownFramework, DISCOVERY_SHM_FW_SERVICES); + + if (ownServices == NULL) + { + ownServices = hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals, utils_stringEquals); + hashMap_put(ownFramework, DISCOVERY_SHM_FW_SERVICES, ownServices); + } + + // check whether we want to add or remove a service + if (addService == true) + { + // add service + hash_map_pt newService = hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals, utils_stringEquals); + + // we need to make a copy of the properties + properties_pt endpointProperties = properties_create(); + hash_map_iterator_pt epItr = hashMapIterator_create(endpoint->properties); + + while (hashMapIterator_hasNext(epItr) == true) + { + hash_map_entry_pt epEntry = hashMapIterator_nextEntry(epItr); + properties_set(endpointProperties, (char*) hashMapEntry_getKey(epEntry), (char*) hashMapEntry_getValue(epEntry)); + } + + hashMapIterator_destroy(epItr); + + hashMap_put(newService, DISCOVERY_SHM_SRVC_PROPERTIES, endpointProperties); + hashMap_put(ownServices, endpoint->service, newService); } else { - hashMap_remove(registeredShmServices, serviceName); + printf("remove Services %s\n", endpoint->service); + + hashMap_remove(ownServices, endpoint->service); + + // check if other services are exported, otherwise remove framework/pid as well + // this is also important to ensure a correct reference-counting (we assume that a discovery bundle crashed if we can find + // the structure but the process with the pid does not live anymore) + if (hashMap_size(ownServices) == 0) + { + printf("removing framework w/ uuid %s\n", uuid); + + hashMap_remove(ownFramework, DISCOVERY_SHM_FW_SERVICES); + hashMap_remove(regDiscoveryInstances, uuid); + } + } - // write back - if ((status = netstring_encodeFromHashMap(discovery->pool, registeredShmServices, &encShmServices)) == CELIX_SUCCESS) + // write back to shm + char* encShmMemStr = NULL; + + if ((status = discovery_encShmMap(discovery, regDiscoveryInstances, &encShmMemStr)) == CELIX_SUCCESS) { - strcpy(shmData->data, encShmServices); + strcpy(&(shmData->data[0]), encShmMemStr); } else { @@ -347,35 +589,25 @@ celix_status_t discovery_updateSHMServic } } - hashMap_destroy(registeredShmServices, false, false); - discovery_unlock(shmData->semId, 0); + hashMap_destroy(regDiscoveryInstances, false, false); - /* unlock and afterwards lock to inform all listener */ - discovery_unlock(shmData->semId, 1); - // wait till notify semaphore is 0 to ensure all threads have performed update routine - discovery_wait(shmData->semId, 2); - discovery_lock(shmData->semId, 1); + discovery_unlock(shmData->semId, 0); + discovery_broadcast(shmData->semId, 1); } } return status; } - -celix_status_t discovery_registerSHMService(discovery_pt discovery, char *serviceName, char *nsEncAttributes) +celix_status_t discovery_registerSHMService(discovery_pt discovery, endpoint_description_pt endpoint) { - return discovery_updateSHMServices(discovery, serviceName, nsEncAttributes); + return discovery_updateSHMServices(discovery, endpoint, true); } - - -celix_status_t discovery_deregisterSHMService(discovery_pt discovery, char *serviceName) +celix_status_t discovery_deregisterSHMService(discovery_pt discovery, endpoint_description_pt endpoint) { - return discovery_updateSHMServices(discovery, serviceName, NULL); + return discovery_updateSHMServices(discovery, endpoint, false); } - - - celix_status_t discovery_endpointAdded(void *handle, endpoint_description_pt endpoint, char *machtedFilter) { celix_status_t status = CELIX_SUCCESS; printf("DISCOVERY: Endpoint for %s, with filter \"%s\" added\n", endpoint->service, machtedFilter); @@ -383,31 +615,25 @@ celix_status_t discovery_endpointAdded(v if (status == CELIX_SUCCESS) { - char *nsEncAttribute; - - netstring_encodeFromHashMap(discovery->pool, (hash_map_pt) endpoint->properties, &nsEncAttribute); - - if ((status = discovery_registerSHMService(discovery, endpoint->service, nsEncAttribute)) != CELIX_SUCCESS) + if ((status = discovery_registerSHMService(discovery, endpoint)) != CELIX_SUCCESS) { printf("DISCOVERY: Error registering service (%s) within shared memory \n", endpoint->service); } - arrayList_add(discovery->registered, strdup(endpoint->service)); + arrayList_add(discovery->registered, endpoint); } return status; } - celix_status_t discovery_endpointRemoved(void *handle, endpoint_description_pt endpoint, char *machtedFilter) { celix_status_t status = CELIX_SUCCESS; printf("DISCOVERY: Endpoint for %s, with filter \"%s\" removed\n", endpoint->service, machtedFilter); discovery_pt discovery = handle; - char *serviceUrl = NULL; if (status == CELIX_SUCCESS) { - status = discovery_deregisterSHMService(discovery, endpoint->service); + status = discovery_deregisterSHMService(discovery, endpoint); int i; for (i = 0; i < arrayList_size(discovery->registered); i++) { char *url = arrayList_get(discovery->registered, i); @@ -472,14 +698,22 @@ celix_status_t discovery_updateEndpointL arrayList_add(scopes, scope); } - hash_map_iterator_pt iter = hashMapIterator_create(discovery->shmServices); + hash_map_iterator_pt fwIter = hashMapIterator_create(discovery->shmServices); - while (hashMapIterator_hasNext(iter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); - char *key = hashMapEntry_getKey(entry); - endpoint_description_pt value = hashMapEntry_getValue(entry); - discovery_informListener(discovery, service, value); + while (hashMapIterator_hasNext(fwIter)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(fwIter); + hash_map_pt fwServices = hashMapEntry_getValue(entry); + + hash_map_iterator_pt fwServicesIter = hashMapIterator_create(fwServices); + + while (hashMapIterator_hasNext(fwServicesIter)) + { + endpoint_description_pt value = (endpoint_description_pt) hashMapIterator_nextValue(fwServicesIter); + discovery_informListener(discovery, service, value); + } + hashMapIterator_destroy(fwServicesIter); } + hashMapIterator_destroy(fwIter); return status; } @@ -494,48 +728,47 @@ celix_status_t discovery_endpointListene return status; } - celix_status_t discovery_createOrAttachShm(discovery_pt discovery) { celix_status_t status = CELIX_SUCCESS; - key_t shmKey = ftok(DISCOVERY_SHM_FILENAME, DISCOVERY_SHM_FTOK_ID); + key_t shmKey = ftok(DISCOVERY_SHM_FILENAME, DISCOVERY_SHM_FTOK_ID); if ((discovery->shmId = shmget(shmKey, DISCOVERY_SHM_MEMSIZE, 0666)) < 0) - { - printf("DISCOVERY : Could not attach to shared memory. Trying to create shared memory segment. \n"); + { + printf("DISCOVERY : Could not attach to shared memory. Trying to create shared memory segment. \n"); - // trying to create shared memory - if ((discovery->shmId = shmget(shmKey, DISCOVERY_SHM_MEMSIZE, IPC_CREAT | 0666)) < 0) - { - printf("DISCOVERY : Creation of shared memory segment failed\n"); - status = CELIX_BUNDLE_EXCEPTION; - } - else if ((discovery->shmBaseAdress = shmat(discovery->shmId, 0, 0)) == (char*) -1 ) - { - printf("DISCOVERY : Attaching of shared memory segment failed\n"); - status = CELIX_BUNDLE_EXCEPTION; - } - else - { - int semId = -1; - key_t semKey = -1; - ipc_shmData_pt shmData = NULL; - printf("DISCOVERY : Shared memory segment successfully created at %p\n", discovery->shmBaseAdress); - - // create structure - shmData = apr_palloc(discovery->pool, sizeof(*shmData)); - semKey = ftok(DISCOVERY_SEM_FILENAME, DISCOVERY_SEM_FTOK_ID); + // trying to create shared memory + if ((discovery->shmId = shmget(shmKey, DISCOVERY_SHM_MEMSIZE, IPC_CREAT | 0666)) < 0) + { + printf("DISCOVERY : Creation of shared memory segment failed\n"); + status = CELIX_BUNDLE_EXCEPTION; + } + else if ((discovery->shmBaseAdress = shmat(discovery->shmId, 0, 0)) == (char*) -1) + { + printf("DISCOVERY : Attaching of shared memory segment failed\n"); + status = CELIX_BUNDLE_EXCEPTION; + } + else + { + int semId = -1; + key_t semKey = -1; + ipc_shmData_pt shmData = NULL; + printf("DISCOVERY : Shared memory segment successfully created at %p\n", discovery->shmBaseAdress); + + // create structure + shmData = calloc(1, sizeof(*shmData)); + semKey = ftok(DISCOVERY_SEM_FILENAME, DISCOVERY_SEM_FTOK_ID); - if ((semId = semget(semKey, 3, 0666 | IPC_CREAT)) == -1) - { + if ((semId = semget(semKey, 2, 0666 | IPC_CREAT)) == -1) + { printf("DISCOVERY : Creation of semaphores failed %i\n", semId); } else { // set - if ( semctl (semId, 0, SETVAL, (int) 1) < 0) - { + if (semctl(semId, 0, SETVAL, (int) 1) < 0) + { printf(" DISCOVERY : error while initializing semaphore 0 \n"); } else @@ -543,8 +776,8 @@ celix_status_t discovery_createOrAttachS printf(" DISCOVERY : semaphore 0 initialized w/ %d\n", semctl(semId, 0, GETVAL, 0)); } - if ( semctl (semId, 1, SETVAL, (int) 0) < 0) - { + if (semctl(semId, 1, SETVAL, (int) 0) < 0) + { printf(" DISCOVERY : error while initializing semaphore 1\n"); } else @@ -552,31 +785,27 @@ celix_status_t discovery_createOrAttachS printf(" DISCOVERY : semaphore 1 initialized w/ %d\n", semctl(semId, 1, GETVAL, 0)); } - if ( semctl (semId, 2, SETVAL, (int) 0) < 0) - { - printf(" DISCOVERY : error while initializing semaphore 2\n"); - } - else - { - printf(" DISCOVERY : semaphore 2 initialized w/ %d\n", semctl(semId, 2, GETVAL, 0)); - } - shmData->semId = semId; + shmData->numListeners = 1; + printf(" numListeners is initalized: %d \n", shmData->numListeners); + memcpy(discovery->shmBaseAdress, shmData, sizeof(*shmData)); } - } + + free(shmData); + } + } + else if ((discovery->shmBaseAdress = shmat(discovery->shmId, 0, 0)) < 0) + { + printf("DISCOVERY : Attaching of shared memory segment failed\n"); + status = CELIX_BUNDLE_EXCEPTION; } - else if ((discovery->shmBaseAdress = shmat(discovery->shmId, 0, 0)) < 0) - { - printf("DISCOVERY : Attaching of shared memory segment failed\n"); - status = CELIX_BUNDLE_EXCEPTION; - } - else + else { - ipc_shmData_pt shmData = (ipc_shmData_pt) discovery->shmBaseAdress; + ipc_shmData_pt shmData = (ipc_shmData_pt) discovery->shmBaseAdress; discovery_lock(shmData->semId, 0); - shmData->numListeners++; + shmData->numListeners++; discovery_unlock(shmData->semId, 0); discovery_updateLocalSHMServices(discovery); } @@ -584,28 +813,27 @@ celix_status_t discovery_createOrAttachS return status; } - - celix_status_t discovery_stopOrDetachShm(discovery_pt discovery) { celix_status_t status = CELIX_SUCCESS; - if ((discovery->shmId < 0) || (discovery->shmBaseAdress == NULL)) - { + if ((discovery->shmId < 0) || (discovery->shmBaseAdress == NULL)) + { printf("DISCOVERY : discovery_addNewEntry : shared memory not initialized.\n"); status = CELIX_BUNDLE_EXCEPTION; } else { int listener = 0; - ipc_shmData_pt shmData = (ipc_shmData_pt) discovery->shmBaseAdress; + ipc_shmData_pt shmData = (ipc_shmData_pt) discovery->shmBaseAdress; discovery_lock(shmData->semId, 0); - listener = shmData->numListeners--; + shmData->numListeners--; + printf(" numListeners decreased: %d \n", shmData->numListeners); discovery_unlock(shmData->semId, 0); - if (listener > 0) - { + if (shmData->numListeners > 0) + { printf("DISCOVERY: Detaching from Shared memory\n"); shmdt(discovery->shmBaseAdress); } @@ -624,124 +852,162 @@ celix_status_t discovery_stopOrDetachShm return status; } - celix_status_t discovery_updateLocalSHMServices(discovery_pt discovery) { celix_status_t status = CELIX_SUCCESS; ipc_shmData_pt shmData = (ipc_shmData_pt) discovery->shmBaseAdress; - if((status = discovery_lock(shmData->semId, 0)) != CELIX_SUCCESS) + if ((status = discovery_lock(shmData->semId, 0)) != CELIX_SUCCESS) { - printf("DISCOVERY : discovery_updateLocalSHMServices : cannot acquire semaphore\n"); + printf("cannot acquire semaphore"); } else { - hash_map_pt registeredShmServices = hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals, utils_stringEquals); + hash_map_pt regDiscoveryInstances = hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals, utils_stringEquals); - if ((status = netstring_decodeToHashMap(discovery->pool, &(shmData->data[0]), registeredShmServices)) != CELIX_SUCCESS) - { - printf("DISCOVERY : discovery_updateLocalSHMServices : decoding data to properties failed\n"); - } - else + if ((status = discovery_decShmMap(discovery, &(shmData->data[0]), regDiscoveryInstances)) == CELIX_SUCCESS) { - /* check for new services */ - hash_map_iterator_pt shmPrpItr = hashMapIterator_create(registeredShmServices); + hash_map_iterator_pt regDiscoveryInstancesItr = hashMapIterator_create(regDiscoveryInstances); - while(hashMapIterator_hasNext(shmPrpItr) == true) + while (hashMapIterator_hasNext(regDiscoveryInstancesItr) == true) { - hash_map_entry_pt shmPrpEntry = hashMapIterator_nextEntry(shmPrpItr); - char *serviceName = hashMapEntry_getKey(shmPrpEntry); + hash_map_entry_pt regDiscoveryEntry = hashMapIterator_nextEntry(regDiscoveryInstancesItr); - if(hashMap_get(discovery->shmServices, serviceName) != NULL) - { - printf("DISCOVERY : discovery_updateLocalSHMServices : service with url %s already registered\n", serviceName ); - } - else + char* uuid = hashMapEntry_getKey(regDiscoveryEntry); + hash_map_pt fwAttr = hashMapEntry_getValue(regDiscoveryEntry); + hash_map_pt services = hashMap_get(fwAttr, DISCOVERY_SHM_FW_SERVICES); + + /* check for new services */ + hash_map_iterator_pt srvcItr = hashMapIterator_create(services); + + while (hashMapIterator_hasNext(srvcItr) == true) { - hash_map_pt props = hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals, utils_stringEquals); - char *nsEncEndpointProp = hashMapEntry_getValue(shmPrpEntry); + hash_map_entry_pt srvc = hashMapIterator_nextEntry(srvcItr); + + char *srvcName = hashMapEntry_getKey(srvc); + hash_map_pt srvcAttr = hashMapEntry_getValue(srvc); + hash_map_pt fwServices = NULL; + + // check whether we have a service from that fw at all + if ((fwServices = hashMap_get(discovery->shmServices, uuid)) == NULL) + { + fwServices = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + hashMap_put(discovery->shmServices, uuid, fwServices); + } - if ( (status = netstring_decodeToHashMap(discovery->pool, nsEncEndpointProp, props)) != CELIX_SUCCESS) + if (hashMap_get(fwServices, srvcName) != NULL) { - printf("DISCOVERY : discovery_updateLocalSHMServices : Decoding of endpointProperties failed\n"); + printf("DISCOVERY : discovery_updateLocalSHMServices : service with url %s from %s already registered", srvcName, uuid); } else { endpoint_description_pt endpoint = apr_palloc(discovery->pool, sizeof(*endpoint)); - endpoint->id = apr_pstrdup(discovery->pool, serviceName); + endpoint->id = apr_pstrdup(discovery->pool, srvcName); endpoint->serviceId = 42; - endpoint->service = apr_pstrdup(discovery->pool, serviceName); - endpoint->properties = (properties_pt) props; + endpoint->service = apr_pstrdup(discovery->pool, srvcName); + endpoint->properties = (properties_pt) hashMap_get(srvcAttr, DISCOVERY_SHM_SRVC_PROPERTIES); + endpoint->frameworkUUID = uuid; discovery_addService(discovery, endpoint); - hashMap_put(discovery->shmServices, apr_pstrdup(discovery->pool, serviceName), endpoint); + hashMap_put(fwServices, srvcName, endpoint); } } - } - hashMapIterator_destroy(shmPrpItr); + hashMapIterator_destroy(srvcItr); + + /* check for obsolete services for this uuid */ + hash_map_pt fwServices = hashMap_get(discovery->shmServices, uuid); + hash_map_iterator_pt shmServicesItr = hashMapIterator_create(fwServices); - /* check for obsolete services */ - hash_map_iterator_pt shmServicesItr = hashMapIterator_create(discovery->shmServices); + // iterate over frameworks + while (hashMapIterator_hasNext(shmServicesItr) == true) + { + hash_map_entry_pt shmService = hashMapIterator_nextEntry(shmServicesItr); + char *fwurl = hashMapEntry_getKey(shmService); - while(hashMapIterator_hasNext(shmServicesItr) == true) + if (hashMap_get(services, fwurl) == NULL) + { + printf("DISCOVERY: service with url %s from %s already unregistered", fwurl, uuid); + endpoint_description_pt endpoint = hashMap_get(fwServices, fwurl); + discovery_removeService(discovery, endpoint); + hashMap_remove(fwServices, fwurl); + } + } + hashMapIterator_destroy(shmServicesItr); + } + hashMapIterator_destroy(regDiscoveryInstancesItr); + + /* check for obsolete frameworks*/ + hash_map_iterator_pt lclFwItr = hashMapIterator_create(discovery->shmServices); + + // iterate over frameworks + while (hashMapIterator_hasNext(lclFwItr) == true) { - hash_map_entry_pt shmService = hashMapIterator_nextEntry(shmServicesItr); - char *url = hashMapEntry_getKey(shmService); + hash_map_entry_pt lclFwEntry = hashMapIterator_nextEntry(lclFwItr); + char *fwUUID = hashMapEntry_getKey(lclFwEntry); - if(hashMap_get(registeredShmServices, url) == NULL) + // whole framework not available any more + if (hashMap_get(regDiscoveryInstances, fwUUID) == NULL) { - printf("DISCOVERY : discovery_updateLocalSHMServices : service with url %s unregistered\n", url); - endpoint_description_pt endpoint = hashMap_get(discovery->shmServices, url); - discovery_removeService(discovery, endpoint); - hashMap_remove(discovery->shmServices, url); + hash_map_pt lclFwServices = NULL; + + if ((lclFwServices = (hash_map_pt) hashMapEntry_getValue(lclFwEntry)) == NULL) + { + printf("UUID %s does not have any services, but a structure\n", fwUUID); + } + else + { + // remove them all + hash_map_iterator_pt lclFwServicesItr = hashMapIterator_create(lclFwServices); + + while (hashMapIterator_hasNext(lclFwServicesItr) == true) + { + hash_map_entry_pt lclFwSrvcEntry = hashMapIterator_nextEntry(lclFwServicesItr); + + discovery_removeService(discovery, (endpoint_description_pt) hashMapEntry_getValue(lclFwSrvcEntry)); + hashMapIterator_remove(lclFwServicesItr); + } + + hashMapIterator_destroy(lclFwServicesItr); + } } } - hashMapIterator_destroy(shmServicesItr); - } + hashMapIterator_destroy(lclFwItr); + + } + hashMap_destroy(regDiscoveryInstances, false, false); discovery_unlock(shmData->semId, 0); - hashMap_destroy(registeredShmServices, false, false); } + return status; } - static void *APR_THREAD_FUNC discovery_pollSHMServices(apr_thread_t *thd, void *data) { discovery_pt discovery = data; celix_status_t status = CELIX_SUCCESS; - if ((discovery->shmId < 0) || (discovery->shmBaseAdress == NULL)) + if ((discovery->shmId < 0) || (discovery->shmBaseAdress == NULL)) { - printf("DISCOVERY : discovery_pollSHMServices : shared memory not initialized.\n"); + printf( "DISCOVERY: shared memory not initialized."); status = CELIX_BUNDLE_EXCEPTION; } else { - ipc_shmData_pt shmData = (ipc_shmData_pt) discovery->shmBaseAdress; - - printf("DISCOVERY : discovery_pollSHMServices thread started\n"); + ipc_shmData_pt shmData = (ipc_shmData_pt) discovery->shmBaseAdress; while(discovery->running == true) { - if(((status = discovery_unlock(shmData->semId, 2)) != CELIX_SUCCESS) && (discovery->running == true)) - { - printf("DISCOVERY : discovery_pollSHMServices : cannot acquire semaphore\n"); - } - else if(((status = discovery_lock(shmData->semId, 1)) != CELIX_SUCCESS) && (discovery->running == true)) + if(((status = discovery_lock(shmData->semId, 1)) != CELIX_SUCCESS) && (discovery->running == true)) { - printf("DISCOVERY : discovery_pollSHMServices : cannot acquire semaphore\n"); + printf( "DISCOVERY: cannot acquire semaphore. Breaking main poll cycle."); + break; } else { discovery_updateLocalSHMServices(discovery); - - discovery_lock(shmData->semId, 2); - discovery_unlock(shmData->semId, 1); } - - sleep(1); } } @@ -749,4 +1015,3 @@ static void *APR_THREAD_FUNC discovery_p return NULL; } -