celix-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [celix] abroekhuis commented on a change in pull request #165: Feature/framework racecondition
Date Fri, 13 Mar 2020 18:44:19 GMT
abroekhuis commented on a change in pull request #165: Feature/framework racecondition
URL: https://github.com/apache/celix/pull/165#discussion_r390855760
 
 

 ##########
 File path: libs/framework/src/service_registry.c
 ##########
 @@ -972,4 +1032,249 @@ bool celix_serviceRegistry_getServiceInfo(
     celixThreadRwlock_unlock(&registry->lock);
 
     return found;
-}
\ No newline at end of file
+}
+
+celix_status_t celix_serviceRegistry_addServiceListener(celix_service_registry_t *registry,
celix_bundle_t *bundle, const char *stringFilter, celix_service_listener_t *listener) {
+
+    celix_filter_t *filter = NULL;
+    if (stringFilter != NULL) {
+        filter = celix_filter_create(stringFilter);
+        if (filter == NULL) {
+            fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "Cannot add service listener filter
'%s' is invalid", stringFilter);
+            return CELIX_ILLEGAL_ARGUMENT;
+        }
+    }
+
+    celix_service_registry_service_listener_entry_t *entry = calloc(1, sizeof(*entry));
+    entry->bundle = bundle;
+    entry->filter = filter;
+    entry->listener = listener;
+    entry->useCount = 1; //new entry -> count on 1
+    celixThreadMutex_create(&entry->mutex, NULL);
+    celixThreadCondition_init(&entry->cond, NULL);
+
+    celix_array_list_t *registrations =  celix_arrayList_create();
+
+    celixThreadRwlock_writeLock(&registry->lock);
+    celix_arrayList_add(registry->serviceListeners, entry); //use count 1
+
+    //find already registered services
+    hash_map_iterator_t iter = hashMapIterator_construct(registry->serviceRegistrations);
+    while (hashMapIterator_hasNext(&iter)) {
+        celix_array_list_t *regs = (array_list_pt) hashMapIterator_nextValue(&iter);
+        for (int regIdx = 0; (regs != NULL) && regIdx < celix_arrayList_size(regs);
++regIdx) {
+            service_registration_pt registration = celix_arrayList_get(regs, regIdx);
+            properties_pt props = NULL;
+            serviceRegistration_getProperties(registration, &props);
+            if (celix_filter_match(filter, props)) {
+                serviceRegistration_retain(registration);
+                long svcId = serviceRegistration_getServiceId(registration);
+                celix_arrayList_add(registrations, registration);
+                //update pending register event count
+                celix_increasePendingRegisteredEvent(registry, svcId);
+            }
+        }
+    }
+    celixThreadRwlock_unlock(&registry->lock);
+
+    //NOTE there is a race condition with serviceRegistry_registerServiceInternal, as result
+    //a REGISTERED event can be triggered twice instead of once. The service tracker can
deal with this.
+    //The handling of pending registered events is to ensure that the UNREGISTERING event
is always
+    //after the 1 or 2 REGISTERED events.
+
+    for (int i = 0; i < celix_arrayList_size(registrations); ++i) {
+        service_registration_pt reg = celix_arrayList_get(registrations, i);
+        long svcId = serviceRegistration_getServiceId(reg);
+        service_reference_pt ref = NULL;
+        serviceRegistry_getServiceReference_internal(registry, bundle, reg, &ref);
+        celix_service_event_t event;
+        event.reference = ref;
+        event.type = OSGI_FRAMEWORK_SERVICE_EVENT_REGISTERED;
+        listener->serviceChanged(listener->handle, &event);
+        serviceReference_release(ref, NULL);
+        serviceRegistration_release(reg);
+
+        //update pending register event count
+        celix_decreasePendingRegisteredEvent(registry, svcId);
+    }
+    celix_arrayList_destroy(registrations);
+
+    serviceRegistry_callHooksForListenerFilter(registry, bundle, entry->filter, false);
+
+    celix_decreaseCountServiceListener(entry); //use count decreased, can be 0
+    return CELIX_SUCCESS;
+}
+
+celix_status_t celix_serviceRegistry_removeServiceListener(celix_service_registry_t *registry,
celix_service_listener_t *listener) {
+    celix_service_registry_service_listener_entry_t *entry = NULL;
+
+    celixThreadRwlock_writeLock(&registry->lock);
+    for (int i = 0; i < celix_arrayList_size(registry->serviceListeners); ++i) {
+        celix_service_registry_service_listener_entry_t *visit = celix_arrayList_get(registry->serviceListeners,
i);
+        if (visit->listener == listener) {
+            entry = visit;
+            celix_arrayList_removeAt(registry->serviceListeners, i);
+            break;
+        }
+    }
+    celixThreadRwlock_unlock(&registry->lock);
+
+    if (entry != NULL) {
+        serviceRegistry_callHooksForListenerFilter(registry, entry->bundle, entry->filter,
true);
+        celix_waitAndDestroyServiceListener(entry);
+    } else {
+        fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "Cannot remove service listener, listener
not found");
+        return CELIX_ILLEGAL_ARGUMENT;
+    }
+    return CELIX_SUCCESS;
+}
+
+static void celix_serviceRegistry_serviceChanged(celix_service_registry_t *registry, celix_service_event_type_t
eventType, service_registration_pt registration) {
+    celix_service_registry_service_listener_entry_t *entry;
+
+    celix_array_list_t* retainedEntries = celix_arrayList_create();
+    celix_array_list_t* matchedEntries = celix_arrayList_create();
+
+    celixThreadRwlock_readLock(&registry->lock);
+    for (int i = 0; i < celix_arrayList_size(registry->serviceListeners); ++i) {
+        entry = celix_arrayList_get(registry->serviceListeners, i);
+        celix_arrayList_add(retainedEntries, entry);
+        celix_increaseCountServiceListener(entry); //ensure that use count > 0, so that
the listener cannot be destroyed until all pending event are handled.
+    }
+    celixThreadRwlock_unlock(&registry->lock);
+
+    for (int i = 0; i < celix_arrayList_size(retainedEntries); ++i) {
+        entry = celix_arrayList_get(retainedEntries, i);
+        int matched = 0;
+        celix_properties_t *props = NULL;
+        bool matchResult = false;
+        serviceRegistration_getProperties(registration, &props);
+        if (entry->filter != NULL) {
+            filter_match(entry->filter, props, &matchResult);
+        }
+        matched = (entry->filter == NULL) || matchResult;
+        if (matched) {
+            celix_arrayList_add(matchedEntries, entry);
+        } else {
+            celix_decreaseCountServiceListener(entry); //Not a match -> release entry
+        }
+    }
+    celix_arrayList_destroy(retainedEntries);
+
+    /*
+     * TODO FIXME, A deadlock can happen when (e.g.) a service is deregistered, triggering
this fw_serviceChanged and
+     * one of the matching service listener callbacks tries to remove an other matched service
listener.
+     * The remove service listener will call the listener_waitForDestroy and the fw_serviceChanged
part keeps the
+     * usageCount on > 0.
+     *
+     * Not sure how to prevent/handle this.
+     */
+
+    for (int i = 0; i < celix_arrayList_size(matchedEntries); ++i) {
+        entry = celix_arrayList_get(matchedEntries, i);
+        service_reference_pt reference = NULL;
+        celix_service_event_t event;
+        serviceRegistry_getServiceReference(registry, entry->bundle, registration, &reference);
+        event.type = eventType;
+        event.reference = reference;
+        entry->listener->serviceChanged(entry->listener->handle, &event);
+        serviceRegistry_ungetServiceReference(registry, entry->bundle, reference);
+        celix_decreaseCountServiceListener(entry); //decrease usage, so that the listener
can be destroyed (if use count is now 0)
+    }
+    celix_arrayList_destroy(matchedEntries);
+}
+
+
+static void celix_increasePendingRegisteredEvent(celix_service_registry_t *registry, long
svcId) {
+    celixThreadMutex_lock(&registry->pendingRegisterEvents.mutex);
+    long count = (long)hashMap_get(registry->pendingRegisterEvents.map, (void*)svcId);
+    count += 1;
+    hashMap_put(registry->pendingRegisterEvents.map, (void*)svcId, (void*)count);
+    celixThreadMutex_unlock(&registry->pendingRegisterEvents.mutex);
+}
+
+static void celix_decreasePendingRegisteredEvent(celix_service_registry_t *registry, long
svcId) {
+    celixThreadMutex_lock(&registry->pendingRegisterEvents.mutex);
+    long count = (long)hashMap_get(registry->pendingRegisterEvents.map, (void*)svcId);
+    assert(count >= 1);
+    count -= 1;
+    if (count > 0) {
+        hashMap_put(registry->pendingRegisterEvents.map, (void *)svcId, (void *)count);
+    } else {
+        hashMap_remove(registry->pendingRegisterEvents.map, (void*)svcId);
+    }
+    celixThreadCondition_signal(&registry->pendingRegisterEvents.cond);
+    celixThreadMutex_unlock(&registry->pendingRegisterEvents.mutex);
+}
+
+static void celix_waitForPendingRegisteredEvents(celix_service_registry_t *registry, long
svcId) {
+    celixThreadMutex_lock(&registry->pendingRegisterEvents.mutex);
+    long count = (long)hashMap_get(registry->pendingRegisterEvents.map, (void*)svcId);
+    while (count > 0) {
+        celixThreadCondition_wait(&registry->pendingRegisterEvents.cond, &registry->pendingRegisterEvents.mutex);
+        count = (long)hashMap_get(registry->pendingRegisterEvents.map, (void*)svcId);
+    }
+    celixThreadMutex_unlock(&registry->pendingRegisterEvents.mutex);
+}
+
+//static void celix_serviceRegistry_triggerListenerHooks(celix_service_registry_t *registry,
const char* serviceName, const celix_properties_t *properties) {
 
 Review comment:
   Commented out code?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message