celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnol...@apache.org
Subject [1/2] celix git commit: CELIX-237: Forward remove endpoint call from rsa to import_registration. Removed status check for mutex unlock in top man. This causes deadlocks when an error occurs
Date Wed, 16 Sep 2015 11:03:30 GMT
Repository: celix
Updated Branches:
  refs/heads/feature/CELIX-237_rsa-ffi 313452d34 -> e4d3ea6e8


CELIX-237: Forward remove endpoint call from rsa to import_registration. Removed status check
for mutex unlock in top man. This causes deadlocks when an error occurs


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/a477ab96
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/a477ab96
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/a477ab96

Branch: refs/heads/feature/CELIX-237_rsa-ffi
Commit: a477ab96cf01896d360b71823e6302b942271840
Parents: 313452d
Author: Pepijn Noltes <pepijnnoltes@gmail.com>
Authored: Wed Sep 16 12:45:19 2015 +0200
Committer: Pepijn Noltes <pepijnnoltes@gmail.com>
Committed: Wed Sep 16 12:45:19 2015 +0200

----------------------------------------------------------------------
 .../private/include/import_registration_dfi.h   |   3 +-
 .../rsa/private/src/import_registration_dfi.c   |  40 ++++---
 .../rsa/private/src/remote_service_admin_dfi.c  |  52 ++--------
 .../private/src/topology_manager.c              | 104 +++++++++----------
 4 files changed, 86 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/a477ab96/remote_services/remote_service_admin_dfi/rsa/private/include/import_registration_dfi.h
----------------------------------------------------------------------
diff --git a/remote_services/remote_service_admin_dfi/rsa/private/include/import_registration_dfi.h
b/remote_services/remote_service_admin_dfi/rsa/private/include/import_registration_dfi.h
index 6f2a232..6aa1b26 100644
--- a/remote_services/remote_service_admin_dfi/rsa/private/include/import_registration_dfi.h
+++ b/remote_services/remote_service_admin_dfi/rsa/private/include/import_registration_dfi.h
@@ -10,8 +10,7 @@
 
 typedef void (*send_func_type)(void *handle, endpoint_description_pt endpointDescription,
char *request, char **reply, int* replyStatus);
 
-celix_status_t importRegistration_create(bundle_context_pt context, void (*rsaCallback)(void
*, import_registration_pt),
-                                         void *rsaHandle, endpoint_description_pt description,
const char *classObject,
+celix_status_t importRegistration_create(bundle_context_pt context, endpoint_description_pt
description, const char *classObject,
                                          import_registration_pt *import);
 void importRegistration_destroy(import_registration_pt import);
 

http://git-wip-us.apache.org/repos/asf/celix/blob/a477ab96/remote_services/remote_service_admin_dfi/rsa/private/src/import_registration_dfi.c
----------------------------------------------------------------------
diff --git a/remote_services/remote_service_admin_dfi/rsa/private/src/import_registration_dfi.c
b/remote_services/remote_service_admin_dfi/rsa/private/src/import_registration_dfi.c
index d192e29..56144d9 100644
--- a/remote_services/remote_service_admin_dfi/rsa/private/src/import_registration_dfi.c
+++ b/remote_services/remote_service_admin_dfi/rsa/private/src/import_registration_dfi.c
@@ -10,6 +10,9 @@ struct import_registration {
     bundle_context_pt context;
     endpoint_description_pt  endpoint; //TODO owner? -> free when destroyed
     const char *classObject; //NOTE owned by endpoint
+
+    celix_thread_mutex_t mutex; //protects send & sendhandle
+
     send_func_type send;
     void *sendHandle;
 
@@ -17,9 +20,6 @@ struct import_registration {
     service_registration_pt factoryReg;
 
     hash_map_pt proxies; //key -> bundle, value -> service_proxy
-
-    void (*rsaCloseImportCallback)(void *, import_registration_pt);
-    void *rsaHandle;
 };
 
 struct service_proxy {
@@ -33,8 +33,7 @@ static celix_status_t importRegistration_createProxy(import_registration_pt
impo
 static void importRegistration_proxyFunc(void *userData, void *args[], void *returnVal);
 static void importRegistration_destroyProxy(struct service_proxy *proxy);
 
-celix_status_t importRegistration_create(bundle_context_pt context, void (*rsaCallback)(void
*, import_registration_pt),
-                                         void *rsaHandle, endpoint_description_pt endpoint,
const char *classObject,
+celix_status_t importRegistration_create(bundle_context_pt context, endpoint_description_pt
endpoint, const char *classObject,
                                          import_registration_pt *out) {
     celix_status_t status = CELIX_SUCCESS;
     import_registration_pt reg = calloc(1, sizeof(*reg));
@@ -45,11 +44,10 @@ celix_status_t importRegistration_create(bundle_context_pt context, void
(*rsaCa
 
     if (reg != NULL && reg->factory != NULL) {
         reg->context = context;
-        reg->rsaCloseImportCallback = rsaCallback;
-        reg->rsaHandle = rsaHandle;
         reg->endpoint = endpoint;
         reg->classObject = classObject;
         reg->proxies = hashMap_create(NULL, NULL, NULL, NULL);
+        celixThreadMutex_create(&reg->mutex, NULL);
 
         reg->factory->factory = reg;
         reg->factory->getService = (void *)importRegistration_getService;
@@ -70,8 +68,10 @@ celix_status_t importRegistration_create(bundle_context_pt context, void
(*rsaCa
 celix_status_t importRegistration_setSendFn(import_registration_pt reg,
                                             send_func_type send,
                                             void *handle) {
+    celixThreadMutex_lock(&reg->mutex);
     reg->send = send;
     reg->sendHandle = handle;
+    celixThreadMutex_unlock(&reg->mutex);
 
     return CELIX_SUCCESS;
 }
@@ -79,7 +79,12 @@ celix_status_t importRegistration_setSendFn(import_registration_pt reg,
 void importRegistration_destroy(import_registration_pt import) {
     if (import != NULL) {
         if (import->proxies != NULL) {
-            //TODO destroy proxies
+            hash_map_iterator_pt  iter = hashMapIterator_create(import->proxies);
+            while (hashMapIterator_hasNext(iter)) {
+                struct service_proxy *proxy = hashMapIterator_nextEntry(iter);
+                importRegistration_destroyProxy(proxy);
+            }
+            hashMapIterator_destroy(iter);
             hashMap_destroy(import->proxies, false, false);
             import->proxies = NULL;
         }
@@ -121,6 +126,7 @@ celix_status_t importRegistration_getService(import_registration_pt import,
bund
     printf("getting service for bundle '%s'\n", name);
      */
 
+
     struct service_proxy *proxy = hashMap_get(import->proxies, bundle); //TODO lock
     if (proxy == NULL) {
         status = importRegistration_createProxy(import, bundle, &proxy);
@@ -207,13 +213,8 @@ static celix_status_t importRegistration_createProxy(import_registration_pt
impo
             dynInterface_destroy(proxy->intf);
             proxy->intf = NULL;
         }
-        if (proxy->service != NULL) {
-            free(proxy->service);
-            proxy->service = NULL;
-        }
-        if (proxy != NULL) {
-            free(proxy);
-        }
+        free(proxy->service);
+        free(proxy);
     }
 
     return status;
@@ -235,11 +236,16 @@ static void importRegistration_proxyFunc(void *userData, void *args[],
void *ret
         //printf("Need to send following json '%s'\n", invokeRequest);
     }
 
+
     if (status == CELIX_SUCCESS) {
         char *reply = NULL;
         int rc = 0;
         //printf("sending request\n");
-        import->send(import->sendHandle, import->endpoint, invokeRequest, &reply,
&rc);
+        celixThreadMutex_lock(&import->mutex);
+        if (import->send != NULL) {
+            import->send(import->sendHandle, import->endpoint, invokeRequest, &reply,
&rc);
+        }
+        celixThreadMutex_unlock(&import->mutex);
         //printf("request sended. got reply '%s' with status %i\n", reply, rc);
 
         if (rc == 0) {
@@ -295,7 +301,7 @@ static void importRegistration_destroyProxy(struct service_proxy *proxy)
{
 
 celix_status_t importRegistration_close(import_registration_pt registration) {
     celix_status_t status = CELIX_SUCCESS;
-    registration->rsaCloseImportCallback(registration->rsaHandle, registration);
+    importRegistration_stop(registration);
     return status;
 }
 

http://git-wip-us.apache.org/repos/asf/celix/blob/a477ab96/remote_services/remote_service_admin_dfi/rsa/private/src/remote_service_admin_dfi.c
----------------------------------------------------------------------
diff --git a/remote_services/remote_service_admin_dfi/rsa/private/src/remote_service_admin_dfi.c
b/remote_services/remote_service_admin_dfi/rsa/private/src/remote_service_admin_dfi.c
index 8eadbdb..c12d0aa 100644
--- a/remote_services/remote_service_admin_dfi/rsa/private/src/remote_service_admin_dfi.c
+++ b/remote_services/remote_service_admin_dfi/rsa/private/src/remote_service_admin_dfi.c
@@ -173,7 +173,7 @@ celix_status_t remoteServiceAdmin_create(bundle_context_pt context, remote_servi
 
         do {
             char newPort[10];
-            const char *options[] = { "listening_ports", port, NULL};
+            const char *options[] = { "listening_ports", port, "num_threads", "5", NULL};
 
             (*admin)->ctx = mg_start(&callbacks, (*admin), options);
 
@@ -581,7 +581,7 @@ celix_status_t remoteServiceAdmin_importService(remote_service_admin_pt
admin, e
 
     import_registration_pt import = NULL;
     if (objectClass != NULL) {
-        status = importRegistration_create(admin->context, NULL, NULL, endpointDescription,
objectClass, &import);
+        status = importRegistration_create(admin->context, endpointDescription, objectClass,
&import);
     }
     if (status == CELIX_SUCCESS) {
         importRegistration_setSendFn(import, remoteServiceAdmin_send, admin);
@@ -591,9 +591,9 @@ celix_status_t remoteServiceAdmin_importService(remote_service_admin_pt
admin, e
         status = importRegistration_start(import);
     }
 
-    //celixThreadMutex_lock(&admin->importedServicesLock);
-    //TODO add to list
-    //celixThreadMutex_unlock(&admin->importedServicesLock);
+    celixThreadMutex_lock(&admin->importedServicesLock);
+    hashMap_put(admin->importedServices, import, import);
+    celixThreadMutex_unlock(&admin->importedServicesLock);
 
     if (status == CELIX_SUCCESS) {
         *out = import;
@@ -605,44 +605,14 @@ celix_status_t remoteServiceAdmin_importService(remote_service_admin_pt
admin, e
 
 celix_status_t remoteServiceAdmin_removeImportedService(remote_service_admin_pt admin, import_registration_pt
registration) {
     celix_status_t status = CELIX_SUCCESS;
-    return status;
-    /*
-
-      endpoint_description_pt endpointDescription = (endpoint_description_pt) registration->endpointDescription;
-      import_registration_factory_pt registration_factory = NULL;
-
-      celixThreadMutex_lock(&admin->importedServicesLock);
-
-      registration_factory = (import_registration_factory_pt) hashMap_get(admin->importedServices,
endpointDescription->service);
-
-      // factory available
-      if ((registration_factory == NULL) || (registration_factory->trackedFactory == NULL))
-      {
-          logHelper_log(admin->loghelper, OSGI_LOGSERVICE_ERROR, "RSA: Error while retrieving
registration factory for imported service %s", endpointDescription->service);
-      }
-      else
-      {
-          registration_factory->trackedFactory->unregisterProxyService(registration_factory->trackedFactory->factory,
endpointDescription);
-          arrayList_removeElement(registration_factory->registrations, registration);
-          importRegistration_destroy(registration);
 
-          if (arrayList_isEmpty(registration_factory->registrations))
-          {
-              logHelper_log(admin->loghelper, OSGI_LOGSERVICE_INFO, "RSA: closing proxy.");
-
-              serviceTracker_close(registration_factory->proxyFactoryTracker);
-              importRegistrationFactory_close(registration_factory);
-
-              hashMap_remove(admin->importedServices, endpointDescription->service);
-
-              importRegistrationFactory_destroy(&registration_factory);
-          }
-      }
-
-      celixThreadMutex_unlock(&admin->importedServicesLock);
+    celixThreadMutex_lock(&admin->importedServicesLock);
+    importRegistration_close(registration);
+    //importRegistration_destroy(registration); TODO enable & debug -> segfault
+    hashMap_remove(admin->importedServices, registration);
+    celixThreadMutex_unlock(&admin->importedServicesLock);
 
-      return status;
-    */
+    return status;
 }
 
 

http://git-wip-us.apache.org/repos/asf/celix/blob/a477ab96/remote_services/topology_manager/private/src/topology_manager.c
----------------------------------------------------------------------
diff --git a/remote_services/topology_manager/private/src/topology_manager.c b/remote_services/topology_manager/private/src/topology_manager.c
index 3b9b62f..f78bde5 100644
--- a/remote_services/topology_manager/private/src/topology_manager.c
+++ b/remote_services/topology_manager/private/src/topology_manager.c
@@ -624,60 +624,59 @@ celix_status_t topologyManager_endpointListenerRemoved(void * handle,
service_re
     celix_status_t status;
     topology_manager_pt manager = handle;
 
-    status = celixThreadMutex_lock(&manager->listenerListLock);
-
-    if (status == CELIX_SUCCESS) {
-        if (hashMap_remove(manager->listenerList, reference)) {
-            logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "EndpointListener
Removed");
-        }
+    celixThreadMutex_lock(&manager->listenerListLock);
 
-        status = celixThreadMutex_unlock(&manager->listenerListLock);
+    if (hashMap_remove(manager->listenerList, reference)) {
+        logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "EndpointListener Removed");
     }
 
+
+    celixThreadMutex_unlock(&manager->listenerListLock);
+
+
     return status;
 }
 
 celix_status_t topologyManager_notifyListenersEndpointAdded(topology_manager_pt manager,
remote_service_admin_service_pt rsa, array_list_pt registrations) {
     celix_status_t status;
 
-    status = celixThreadMutex_lock(&manager->listenerListLock);
+    celixThreadMutex_lock(&manager->listenerListLock);
 
-    if (status == CELIX_SUCCESS) {
-        hash_map_iterator_pt iter = hashMapIterator_create(manager->listenerList);
-        while (hashMapIterator_hasNext(iter)) {
-            char *scope = NULL;
-            endpoint_listener_pt epl = NULL;
-            service_reference_pt reference = hashMapIterator_nextKey(iter);
 
-            serviceReference_getProperty(reference, (char *) OSGI_ENDPOINT_LISTENER_SCOPE,
&scope);
+    hash_map_iterator_pt iter = hashMapIterator_create(manager->listenerList);
+    while (hashMapIterator_hasNext(iter)) {
+        char *scope = NULL;
+        endpoint_listener_pt epl = NULL;
+        service_reference_pt reference = hashMapIterator_nextKey(iter);
 
-            status = bundleContext_getService(manager->context, reference, (void **) &epl);
-            if (status == CELIX_SUCCESS) {
-                filter_pt filter = filter_create(scope);
-
-                int regSize = arrayList_size(registrations);
-                for (int regIt = 0; regIt < regSize; regIt++) {
-                    export_registration_pt export = arrayList_get(registrations, regIt);
-
-                    endpoint_description_pt endpoint = NULL;
-                    status = topologyManager_getEndpointDescriptionForExportRegistration(rsa,
export, &endpoint);
-                    if (status == CELIX_SUCCESS) {
-                        bool matchResult = false;
-                        filter_match(filter, endpoint->properties, &matchResult);
-                        if (matchResult) {
-                            status = epl->endpointAdded(epl->handle, endpoint, scope);
-                        }
+        serviceReference_getProperty(reference, (char *) OSGI_ENDPOINT_LISTENER_SCOPE, &scope);
+
+        status = bundleContext_getService(manager->context, reference, (void **) &epl);
+        if (status == CELIX_SUCCESS) {
+            filter_pt filter = filter_create(scope);
+
+            int regSize = arrayList_size(registrations);
+            for (int regIt = 0; regIt < regSize; regIt++) {
+                export_registration_pt export = arrayList_get(registrations, regIt);
+
+                endpoint_description_pt endpoint = NULL;
+                status = topologyManager_getEndpointDescriptionForExportRegistration(rsa,
export, &endpoint);
+                if (status == CELIX_SUCCESS) {
+                    bool matchResult = false;
+                    filter_match(filter, endpoint->properties, &matchResult);
+                    if (matchResult) {
+                        status = epl->endpointAdded(epl->handle, endpoint, scope);
                     }
                 }
-
-                filter_destroy(filter);
             }
+
+            filter_destroy(filter);
         }
+    }
 
-        hashMapIterator_destroy(iter);
+    hashMapIterator_destroy(iter);
 
-        status = celixThreadMutex_unlock(&manager->listenerListLock);
-    }
+    celixThreadMutex_unlock(&manager->listenerListLock);
 
     return status;
 }
@@ -686,35 +685,34 @@ celix_status_t topologyManager_notifyListenersEndpointRemoved(topology_manager_p
 
     celix_status_t status;
 
-    status = celixThreadMutex_lock(&manager->listenerListLock);
+    celixThreadMutex_lock(&manager->listenerListLock);
 
-    if (status == CELIX_SUCCESS) {
-        hash_map_iterator_pt iter = hashMapIterator_create(manager->listenerList);
-        while (hashMapIterator_hasNext(iter)) {
-            endpoint_description_pt endpoint = NULL;
-            endpoint_listener_pt epl = NULL;
-            char *scope = NULL;
+    hash_map_iterator_pt iter = hashMapIterator_create(manager->listenerList);
+    while (hashMapIterator_hasNext(iter)) {
+        endpoint_description_pt endpoint = NULL;
+        endpoint_listener_pt epl = NULL;
+        char *scope = NULL;
 
-            service_reference_pt reference = hashMapIterator_nextKey(iter);
-            serviceReference_getProperty(reference, (char *) OSGI_ENDPOINT_LISTENER_SCOPE,
&scope);
+        service_reference_pt reference = hashMapIterator_nextKey(iter);
+        serviceReference_getProperty(reference, (char *) OSGI_ENDPOINT_LISTENER_SCOPE, &scope);
 
-            status = bundleContext_getService(manager->context, reference, (void **) &epl);
+        status = bundleContext_getService(manager->context, reference, (void **) &epl);
 
-            if (status == CELIX_SUCCESS) {
-                status = topologyManager_getEndpointDescriptionForExportRegistration(rsa,
export, &endpoint);
-            }
+        if (status == CELIX_SUCCESS) {
+            status = topologyManager_getEndpointDescriptionForExportRegistration(rsa, export,
&endpoint);
+        }
 
-            if (status == CELIX_SUCCESS) {
-                status = epl->endpointRemoved(epl->handle, endpoint, NULL);
-            }
+        if (status == CELIX_SUCCESS) {
+            status = epl->endpointRemoved(epl->handle, endpoint, NULL);
         }
 
         hashMapIterator_destroy(iter);
 
-        status = celixThreadMutex_unlock(&manager->listenerListLock);
 
     }
 
+    celixThreadMutex_unlock(&manager->listenerListLock);
+
     return status;
 }
 


Mime
View raw message