celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ejans...@apache.org
Subject [4/6] celix git commit: made Post event async. next step make lock and release handlers list async
Date Fri, 19 Aug 2016 13:18:46 GMT
made Post event async. next step make lock and release handlers list async


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/fd417448
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/fd417448
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/fd417448

Branch: refs/heads/feature/event_admin
Commit: fd4174482f382e19f1f080e4c3ad08857d2b3772
Parents: 7291bef
Author: Erik Jansman <ejansman@apache.org>
Authored: Thu Aug 18 15:21:35 2016 +0200
Committer: Erik Jansman <ejansman@apache.org>
Committed: Thu Aug 18 15:21:35 2016 +0200

----------------------------------------------------------------------
 .../private/include/event_admin_impl.h          |  18 ++-
 .../private/src/event_admin_activator.c         |   3 +
 .../event_admin/private/src/event_admin_impl.c  | 127 +++++++++++++------
 .../event_admin/private/src/event_impl.c        |   3 -
 .../private/src/event_publisher_impl.c          |  15 +--
 5 files changed, 106 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/fd417448/event_admin/event_admin/private/include/event_admin_impl.h
----------------------------------------------------------------------
diff --git a/event_admin/event_admin/private/include/event_admin_impl.h b/event_admin/event_admin/private/include/event_admin_impl.h
index 20b0f1b..f161543 100644
--- a/event_admin/event_admin/private/include/event_admin_impl.h
+++ b/event_admin/event_admin/private/include/event_admin_impl.h
@@ -42,11 +42,16 @@
 #include "event_admin.h"
 #include "log_helper.h"
 
+
 struct event_admin {
-        hash_map_pt channels;
-        array_list_pt event_handlers;
-        bundle_context_pt context;
-        log_helper_pt *loghelper;
+    hash_map_pt channels;
+    array_list_pt event_handlers;
+    bundle_context_pt context;
+    log_helper_pt *loghelper;
+    linked_list_pt eventList;
+    celix_thread_t eventListProcessor;
+    celix_thread_mutex_t *eventListLock;
+    bool eventAdminRunning;
 };
 typedef struct channel *channel_t;
 struct channel {
@@ -64,6 +69,9 @@ struct channel {
  */
 celix_status_t eventAdmin_create( bundle_context_pt context, event_admin_pt *event_admin);
 
+celix_status_t eventAdmin_start(event_admin_pt *event_admin);
+
+celix_status_t eventAdmin_stop(event_admin_pt *event_admin);
 
 celix_status_t eventAdmin_destroy(event_admin_pt *event_admin);
 
@@ -165,5 +173,7 @@ celix_status_t eventAdmin_hashCode(event_pt *event, int *hashCode);
 celix_status_t eventAdmin_matches( event_pt *event);
 celix_status_t eventAdmin_toString( event_pt *event, char *eventString);
 
+celix_status_t processEvent(event_admin_pt event_admin, event_pt event);
 
+void *eventProcessor(void *handle);
 #endif /* EVENT_ADMIN_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/fd417448/event_admin/event_admin/private/src/event_admin_activator.c
----------------------------------------------------------------------
diff --git a/event_admin/event_admin/private/src/event_admin_activator.c b/event_admin/event_admin/private/src/event_admin_activator.c
index 37ec404..f951968 100644
--- a/event_admin/event_admin/private/src/event_admin_activator.c
+++ b/event_admin/event_admin/private/src/event_admin_activator.c
@@ -101,7 +101,9 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt
context)
 		serviceTracker_open(data->tracker);
 		properties = properties_create();
 		event_admin_service = activator->event_admin_service;
+		eventAdmin_start(&event_admin_service->eventAdmin);
 		bundleContext_registerService(context, (char *) EVENT_ADMIN_NAME, event_admin_service,
properties, &activator->registration);
+
 	}
 	return status;
 }
@@ -111,6 +113,7 @@ celix_status_t bundleActivator_stop(void * userData, bundle_context_pt
context)
 	struct activator * data =  userData;
     serviceRegistration_unregister(data->registration);
 	serviceTracker_close(data->tracker);
+	eventAdmin_stop(&data->event_admin_service->eventAdmin);
 	return status;
 }
 

http://git-wip-us.apache.org/repos/asf/celix/blob/fd417448/event_admin/event_admin/private/src/event_admin_impl.c
----------------------------------------------------------------------
diff --git a/event_admin/event_admin/private/src/event_admin_impl.c b/event_admin/event_admin/private/src/event_admin_impl.c
index fb5b26c..e27e89c 100644
--- a/event_admin/event_admin/private/src/event_admin_impl.c
+++ b/event_admin/event_admin/private/src/event_admin_impl.c
@@ -24,6 +24,7 @@
  *  \copyright	Apache License, Version 2.0
  */
 #include <stdlib.h>
+#include <unistd.h>
 
 #include "event_admin.h"
 #include "event_admin_impl.h"
@@ -33,7 +34,7 @@
 #include "celix_log.h"
 
 
-celix_status_t eventAdmin_create(bundle_context_pt context, event_admin_pt *event_admin){
+celix_status_t eventAdmin_create(bundle_context_pt context, event_admin_pt *event_admin)
{
 	celix_status_t status = CELIX_SUCCESS;
 	*event_admin = calloc(1,sizeof(**event_admin));
 	if (!*event_admin) {
@@ -41,13 +42,31 @@ celix_status_t eventAdmin_create(bundle_context_pt context, event_admin_pt
*even
     } else {
         (*event_admin)->channels = hashMap_create(utils_stringHash, utils_stringHash,
utils_stringEquals, utils_stringEquals);
         (*event_admin)->context =context;
+        (*event_admin)->eventListLock = calloc(1, sizeof(celix_thread_mutex_t));
+        (*event_admin)->eventListProcessor = celix_thread_default;
+        (*event_admin)->eventAdminRunning = false;
+        linkedList_create(&(*event_admin)->eventList);//
+
         status = arrayList_create(&(*event_admin)->event_handlers);
     }
 	return status;
 }
 
-celix_status_t eventAdmin_destroy(event_admin_pt *event_admin)
+celix_status_t eventAdmin_start(event_admin_pt *event_admin) {
+    celix_status_t status = CELIX_SUCCESS;
+    status = celixThread_create(&(*event_admin)->eventListProcessor, NULL, eventProcessor,
&(*event_admin));
+    celixThreadMutex_create((*event_admin)->eventListLock, NULL);
+    (*event_admin)->eventAdminRunning = true;
+    return status;
+}
+
+celix_status_t eventAdmin_stop(event_admin_pt *event_admin)
 {
+    (*event_admin)->eventAdminRunning = false;
+    return CELIX_SUCCESS;
+}
+
+celix_status_t eventAdmin_destroy(event_admin_pt *event_admin) {
 	celix_status_t status = CELIX_SUCCESS;
     arrayList_destroy((*event_admin)->event_handlers);
 	free(*event_admin);
@@ -61,45 +80,76 @@ celix_status_t eventAdmin_getEventHandlersByChannel(bundle_context_pt
context, c
 }
 
 celix_status_t eventAdmin_postEvent(event_admin_pt event_admin, event_pt event) {
-	celix_status_t status = CELIX_SUCCESS;
+    bool added = false;
+    while (event_admin->eventAdminRunning && added == false) {
+        if (celixThreadMutex_tryLock(event_admin->eventListLock) == 0) {
+            linkedList_addLast(event_admin->eventList, event);
+            celixThreadMutex_unlock(event_admin->eventListLock);
+            added = true;
+        }
+
+    }
+    return CELIX_SUCCESS;
+}
 
-	const char *topic;
+void *eventProcessor(void *handle) {
+    event_admin_pt *eventAdminPt = handle;
+    event_pt event = NULL;
+    int waitcounter = 1;
+    while ((*eventAdminPt)->eventAdminRunning) {
+        if (celixThreadMutex_tryLock((*eventAdminPt)->eventListLock) == 0) {
+            if (linkedList_isEmpty((*eventAdminPt)->eventList)) {
+                if (waitcounter < 10) {
+                    waitcounter++;
+                } else {
+                    waitcounter = 1;
+                }
+            } else {
+                event = linkedList_removeFirst((*eventAdminPt)->eventList);
+                waitcounter = 1;
 
-    eventAdmin_getTopic(&event, &topic);
+            }
+            celixThreadMutex_unlock((*eventAdminPt)->eventListLock);
+        } else {
+            if (waitcounter < 10) {
+                waitcounter++;
+            } else {
+                waitcounter = 1;
+            }
+        }
+        if (event != NULL) {
+            processEvent((*eventAdminPt), event);
+            event = NULL;
+        }
 
-	array_list_pt event_handlers;
-	arrayList_create(&event_handlers);
-	eventAdmin_lockHandlersList(event_admin, topic);
-	eventAdmin_findHandlersByTopic(event_admin, topic, event_handlers);
-    // TODO make this async!
-	array_list_iterator_pt handlers_iterator = arrayListIterator_create(event_handlers);
-	while (arrayListIterator_hasNext(handlers_iterator)) {
-		event_handler_service_pt event_handler_service = (event_handler_service_pt) arrayListIterator_next(handlers_iterator);
-		//logHelper_log(*event_admin->loghelper, OSGI_LOGSERVICE_INFO, "handler found (POST
EVENT) for %s", topic);
-		event_handler_service->handle_event(&event_handler_service->event_handler, event);
-	}
-	eventAdmin_releaseHandersList(event_admin, topic);
-	return status;
+        usleep(waitcounter * 1000);
+    }
+    return CELIX_SUCCESS;
 }
 
-celix_status_t eventAdmin_sendEvent(event_admin_pt event_admin, event_pt event) {
-	celix_status_t status = CELIX_SUCCESS;
+celix_status_t processEvent(event_admin_pt event_admin, event_pt event) {
+    celix_status_t status = CELIX_SUCCESS;
 
-	const char *topic;
-	eventAdmin_getTopic(&event, &topic);
-
-	array_list_pt event_handlers;
-	arrayList_create(&event_handlers);
-	eventAdmin_lockHandlersList(event_admin, topic);
-	eventAdmin_findHandlersByTopic(event_admin, topic, event_handlers);
-	array_list_iterator_pt handlers_iterator = arrayListIterator_create(event_handlers);
-	while (arrayListIterator_hasNext(handlers_iterator)) {
-		event_handler_service_pt event_handler_service = (event_handler_service_pt) arrayListIterator_next(handlers_iterator);
-	//	logHelper_log(*event_admin->loghelper, OSGI_LOGSERVICE_INFO, "handler found (SEND
EVENT) for %s", topic);
-		event_handler_service->handle_event(&event_handler_service->event_handler, event);
-	}
-	eventAdmin_releaseHandersList(event_admin, topic);
-	return status;
+    const char *topic;
+
+    eventAdmin_getTopic(&event, &topic);
+
+    array_list_pt event_handlers;
+    arrayList_create(&event_handlers);
+    eventAdmin_lockHandlersList(event_admin, topic);
+    eventAdmin_findHandlersByTopic(event_admin, topic, event_handlers);
+
+    array_list_iterator_pt handlers_iterator = arrayListIterator_create(event_handlers);
+    while (arrayListIterator_hasNext(handlers_iterator)) {
+        event_handler_service_pt event_handler_service = (event_handler_service_pt) arrayListIterator_next(
+                handlers_iterator);
+        event_handler_service->handle_event(&event_handler_service->event_handler,
event);
+    }
+    eventAdmin_releaseHandersList(event_admin, topic);
+    return status;
+}
+celix_status_t eventAdmin_sendEvent(event_admin_pt event_admin, event_pt event) {
+    return processEvent(event_admin, event);
 }
 
 celix_status_t eventAdmin_findHandlersByTopic(event_admin_pt event_admin, const char *topic,
@@ -108,7 +158,6 @@ celix_status_t eventAdmin_findHandlersByTopic(event_admin_pt event_admin,
const
 	hash_map_pt channels = event_admin->channels;
     channel_t channel = hashMap_get(channels, topic);
 	if (channel != NULL) {
-		//logHelper_log(*event_admin->loghelper, OSGI_LOGSERVICE_INFO, "found channel: %s",
topic);
 		if (channel->eventHandlers != NULL && !hashMap_isEmpty(channel->eventHandlers))
{
 			// iterate throught the handlers and add them to the array list for result.
 			hash_map_iterator_pt hashmap_iterator =  hashMapIterator_create(channel->eventHandlers);
@@ -116,8 +165,6 @@ celix_status_t eventAdmin_findHandlersByTopic(event_admin_pt event_admin,
const
 				arrayList_add(event_handlers, (event_handler_service_pt) hashMapIterator_nextValue(hashmap_iterator));
 			}
 		}
-	} else {
-		//logHelper_log(*event_admin->loghelper, OSGI_LOGSERVICE_WARNING, "no such channel:
%s", topic);
 	}
 	return status;
 }
@@ -128,10 +175,6 @@ celix_status_t eventAdmin_createEventChannels(event_admin_pt *event_admin,
const
     channel_t channel = hashMap_get((*event_admin)->channels, topic);
 	if (channel == NULL) {
 		//create channel
-		printf("Creating channel: %s", topic);
-
-
-
 		channel = calloc(1, sizeof(*channel));
 		if (!channel) {
             status = CELIX_ENOMEM;

http://git-wip-us.apache.org/repos/asf/celix/blob/fd417448/event_admin/event_admin/private/src/event_impl.c
----------------------------------------------------------------------
diff --git a/event_admin/event_admin/private/src/event_impl.c b/event_admin/event_admin/private/src/event_impl.c
index 8f65c91..5a95474 100644
--- a/event_admin/event_admin/private/src/event_impl.c
+++ b/event_admin/event_admin/private/src/event_impl.c
@@ -38,15 +38,12 @@ celix_status_t eventAdmin_createEvent(event_admin_pt event_admin, const
char *to
 									  event_pt *event) {
 	celix_status_t status = CELIX_SUCCESS;
 
-	//logHelper_log(*event_admin->loghelper, OSGI_LOGSERVICE_DEBUG, "create event event admin
pointer: %p",event_admin);
 
 
 	*event = calloc(1, sizeof(**event));
 	if(!*event){
 	       status = CELIX_ENOMEM;
-	   //    logHelper_log(*event_admin->loghelper, OSGI_LOGSERVICE_ERROR, "No MEM");
 	}else {
-		//logHelper_log(*event_admin->loghelper, OSGI_LOGSERVICE_INFO, "Event created : %s",
topic);
 		(*event)->topic = topic;
 		(*event)->properties = properties;
 		properties_set((*event)->properties, (char *)EVENT_TOPIC, topic);

http://git-wip-us.apache.org/repos/asf/celix/blob/fd417448/event_admin/event_publisher/private/src/event_publisher_impl.c
----------------------------------------------------------------------
diff --git a/event_admin/event_publisher/private/src/event_publisher_impl.c b/event_admin/event_publisher/private/src/event_publisher_impl.c
index 49583b6..c21aaf1 100644
--- a/event_admin/event_publisher/private/src/event_publisher_impl.c
+++ b/event_admin/event_publisher/private/src/event_publisher_impl.c
@@ -55,14 +55,6 @@ celix_status_t eventPublisherStart(event_publisher_pt *event_publisher)
{
     logHelper_start((*event_publisher)->loghelper);
     status = celixThread_create(&(*event_publisher)->sender, NULL, produceEvents,
&(*event_publisher));
     celixThreadMutex_create((*event_publisher)->eventAdminAvailility,NULL);
-    if(status != CELIX_SUCCESS)
-    {
-        printf("failed to create thread\n");
-    }
-    else
-    {
-       printf("thread created \n");
-    }
     return status;
 }
 
@@ -91,14 +83,15 @@ void *produceEvents(void *handle) {
             if (event_admin_service != NULL) {
                 event_pt event;
                 properties_pt props = properties_create();
-                properties_set(props, "Error ", "No Error, Just testing");
+                properties_set(props, "Error ", "No Error, Just testing ");
+
                 (*event_admin_service)->createEvent(event_admin, "log/error/eventpublishers/event",
props, &event);
                 (*event_admin_service)->postEvent(event_admin, event);
-                (*event_admin_service)->sendEvent(event_admin, event);
+                //  (*event_admin_service)->sendEvent(event_admin, event);
             }
             celixThreadMutex_unlock((*event_publisher)->eventAdminAvailility);
         }
-        usleep(1*10000);
+        usleep(1 * 1000000);
     }
     return CELIX_SUCCESS;
 }


Mime
View raw message