celix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bpe...@apache.org
Subject celix git commit: CELIX-77: add threadpool support
Date Fri, 11 Dec 2015 10:56:58 GMT
Repository: celix
Updated Branches:
  refs/heads/develop b56c47d14 -> 1e73e4d1c


CELIX-77: add threadpool support


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/1e73e4d1
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/1e73e4d1
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/1e73e4d1

Branch: refs/heads/develop
Commit: 1e73e4d1cad60bb1af4cd9f31382af2cd22687db
Parents: b56c47d
Author: Bjoern Petri <bpetri@apache.org>
Authored: Fri Dec 11 11:56:34 2015 +0100
Committer: Bjoern Petri <bpetri@apache.org>
Committed: Fri Dec 11 11:56:34 2015 +0100

----------------------------------------------------------------------
 utils/CMakeLists.txt                    |  96 ++---
 utils/private/src/thpool.c              | 562 +++++++++++++++++++++++++++
 utils/private/test/thread_pool_test.cpp | 118 ++++++
 utils/public/docs/Design.md             |  52 +++
 utils/public/docs/FAQ.md                |  33 ++
 utils/public/docs/README.md             |  62 +++
 utils/public/include/thpool.h           | 164 ++++++++
 7 files changed, 1043 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/1e73e4d1/utils/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utils/CMakeLists.txt b/utils/CMakeLists.txt
index 93ff353..62a953f 100644
--- a/utils/CMakeLists.txt
+++ b/utils/CMakeLists.txt
@@ -24,28 +24,31 @@ if (UTILS)
     include_directories("private/include")
     include_directories("public/include")
     add_library(celix_utils SHARED 
-		private/src/array_list.c
-		public/include/array_list.h 
-		private/include/array_list_private.h 
+                private/src/array_list.c
+                public/include/array_list.h 
+                private/include/array_list_private.h 
 
-		private/src/hash_map.c
-		public/include/hash_map.h 
-		private/include/hash_map_private.h
+                private/src/hash_map.c
+                public/include/hash_map.h 
+                private/include/hash_map_private.h
 
-		private/src/linked_list.c
-		private/src/linked_list_iterator.c
-		public/include/linked_list.h
-		public/include/linked_list_iterator.h 
-		private/include/linked_list_private.h
+                private/src/linked_list.c
+                private/src/linked_list_iterator.c
+                public/include/linked_list.h
+                public/include/linked_list_iterator.h 
+                private/include/linked_list_private.h
 
-		public/include/exports.h
-		
-		private/src/celix_threads.c
-		public/include/celix_threads.h
-	)
+                public/include/exports.h
+                
+                private/src/celix_threads.c
+                public/include/celix_threads.h
+
+                private/src/thpool.c
+                public/include/thpool.h
+        )
     
     IF(UNIX)
-    	target_link_libraries(celix_utils m pthread)
+        target_link_libraries(celix_utils m pthread)
     ENDIF(UNIX)
     
     install(TARGETS celix_utils DESTINATION lib COMPONENT framework)
@@ -54,34 +57,39 @@ if (UTILS)
     
     celix_subproject(UTILS-TESTS "Option to build the utilities library tests" "OFF")
 
-	if (ENABLE_TESTING AND UTILS-TESTS)
-		find_package(CppUTest REQUIRED)
+        if (ENABLE_TESTING AND UTILS-TESTS)
+            find_package(CppUTest REQUIRED)
+
+            include_directories(${CUNIT_INCLUDE_DIRS})
+            include_directories(${CPPUTEST_INCLUDE_DIR})
+            include_directories("${PROJECT_SOURCE_DIR}/utils/public/include")
+            include_directories("${PROJECT_SOURCE_DIR}/utils/private/include")
+            
+            add_executable(hash_map_test private/test/hash_map_test.cpp)
+            target_link_libraries(hash_map_test celix_utils ${CPPUTEST_LIBRARY} pthread)
+            
+            add_executable(array_list_test private/test/array_list_test.cpp)
+            target_link_libraries(array_list_test celix_utils ${CPPUTEST_LIBRARY} pthread)
+            
+            add_executable(celix_threads_test private/test/celix_threads_test.cpp)
+            target_link_libraries(celix_threads_test celix_utils ${CPPUTEST_LIBRARY} ${CPPUTEST_EXT_LIBRARY}
pthread)
+            add_executable(linked_list_test private/test/linked_list_test.cpp)
+            target_link_libraries(linked_list_test celix_utils ${CPPUTEST_LIBRARY} pthread)
+            
+            add_executable(thread_pool_test private/test/thread_pool_test.cpp)
+            target_link_libraries(thread_pool_test celix_utils ${CPPUTEST_LIBRARY} pthread)

 
-	    include_directories(${CUNIT_INCLUDE_DIRS})
-	    include_directories(${CPPUTEST_INCLUDE_DIR})
-	    include_directories("${PROJECT_SOURCE_DIR}/utils/public/include")
-	    include_directories("${PROJECT_SOURCE_DIR}/utils/private/include")
-	    
-	    add_executable(hash_map_test private/test/hash_map_test.cpp)
-	    target_link_libraries(hash_map_test celix_utils ${CPPUTEST_LIBRARY} pthread)
-	    
-	    add_executable(array_list_test private/test/array_list_test.cpp)
-	    target_link_libraries(array_list_test celix_utils ${CPPUTEST_LIBRARY} pthread)
-	    
-		add_executable(celix_threads_test private/test/celix_threads_test.cpp)
-	    target_link_libraries(celix_threads_test celix_utils ${CPPUTEST_LIBRARY} ${CPPUTEST_EXT_LIBRARY}
pthread)
-	    
-	    add_executable(linked_list_test private/test/linked_list_test.cpp)
-	    target_link_libraries(linked_list_test celix_utils ${CPPUTEST_LIBRARY} pthread)
-	    
- 		add_test(NAME run_array_list_test COMMAND array_list_test)
- 		add_test(NAME run_hash_map_test COMMAND hash_map_test)
-       	add_test(NAME run_celix_threads_test COMMAND celix_threads_test)
-       	add_test(NAME run_linked_list_test COMMAND linked_list_test)
-       	SETUP_TARGET_FOR_COVERAGE(array_list_test array_list_test ${CMAKE_BINARY_DIR}/coverage/array_list_test/array_list_test)
-		SETUP_TARGET_FOR_COVERAGE(hash_map hash_map_test ${CMAKE_BINARY_DIR}/coverage/hash_map_test/hash_map_test)
-		SETUP_TARGET_FOR_COVERAGE(celix_threads_test celix_threads_test ${CMAKE_BINARY_DIR}/coverage/celix_threads_test/celix_threads_test)
-		SETUP_TARGET_FOR_COVERAGE(linked_list_test linked_list_test ${CMAKE_BINARY_DIR}/coverage/linked_list_test/linked_list_test)
+            add_test(NAME run_array_list_test COMMAND array_list_test)
+            add_test(NAME run_hash_map_test COMMAND hash_map_test)
+            add_test(NAME run_celix_threads_test COMMAND celix_threads_test)
+            add_test(NAME run_thread_pool_test COMMAND thread_pool_test)
+            add_test(NAME run_linked_list_test COMMAND linked_list_test)
+        
+            SETUP_TARGET_FOR_COVERAGE(array_list_test array_list_test ${CMAKE_BINARY_DIR}/coverage/array_list_test/array_list_test)
+            SETUP_TARGET_FOR_COVERAGE(hash_map hash_map_test ${CMAKE_BINARY_DIR}/coverage/hash_map_test/hash_map_test)
+            SETUP_TARGET_FOR_COVERAGE(celix_threads_test celix_threads_test ${CMAKE_BINARY_DIR}/coverage/celix_threads_test/celix_threads_test)
+            SETUP_TARGET_FOR_COVERAGE(thread_pool_test thread_pool_test ${CMAKE_BINARY_DIR}/coverage/thread_pool_test/thread_pool_test)
+            SETUP_TARGET_FOR_COVERAGE(linked_list_test linked_list_test ${CMAKE_BINARY_DIR}/coverage/linked_list_test/linked_list_test)
 
    endif(ENABLE_TESTING AND UTILS-TESTS)
 endif (UTILS)

http://git-wip-us.apache.org/repos/asf/celix/blob/1e73e4d1/utils/private/src/thpool.c
----------------------------------------------------------------------
diff --git a/utils/private/src/thpool.c b/utils/private/src/thpool.c
new file mode 100644
index 0000000..f81350e
--- /dev/null
+++ b/utils/private/src/thpool.c
@@ -0,0 +1,562 @@
+/* ********************************
+ * Author:       Johan Hanssen Seferidis
+ * License:	     MIT
+ * Description:  Library providing a threading pool where you can add
+ *               work. For usage, check the thpool.h file or README.md
+ *
+ *//** @file thpool.h *//*
+ * 
+ ********************************/
+
+
+#include <unistd.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <pthread.h>
+#include <errno.h>
+#include <time.h> 
+#include <sys/prctl.h>
+
+#include "thpool.h"
+
+#ifdef THPOOL_DEBUG
+#define THPOOL_DEBUG 1
+#else
+#define THPOOL_DEBUG 0
+#endif
+
+#define MAX_NANOSEC 999999999
+#define CEIL(X) ((X-(int)(X)) > 0 ? (int)(X+1) : (int)(X))
+
+static volatile int threads_keepalive;
+static volatile int threads_on_hold;
+
+
+
+
+
+/* ========================== STRUCTURES ============================ */
+
+
+/* Binary semaphore */
+typedef struct bsem {
+	pthread_mutex_t mutex;
+	pthread_cond_t   cond;
+	int v;
+} bsem;
+
+
+/* Job */
+typedef struct job{
+	struct job*  prev;                   /* pointer to previous job   */
+	void*  (*function)(void* arg);       /* function pointer          */
+	void*  arg;                          /* function's argument       */
+} job;
+
+
+/* Job queue */
+typedef struct jobqueue{
+	pthread_mutex_t rwmutex;             /* used for queue r/w access */
+	job  *front;                         /* pointer to front of queue */
+	job  *rear;                          /* pointer to rear  of queue */
+	bsem *has_jobs;                      /* flag as binary semaphore  */
+	int   len;                           /* number of jobs in queue   */
+} jobqueue;
+
+
+/* Thread */
+typedef struct thread{
+	int       id;                        /* friendly id               */
+	pthread_t pthread;                   /* pointer to actual thread  */
+	struct thpool_* thpool_p;            /* access to thpool          */
+} thread;
+
+
+/* Threadpool */
+typedef struct thpool_{
+	thread**   threads;                  /* pointer to threads        */
+	volatile int num_threads_alive;      /* threads currently alive   */
+	volatile int num_threads_working;    /* threads currently working */
+	pthread_mutex_t  thcount_lock;       /* used for thread count etc */
+	jobqueue*  jobqueue_p;               /* pointer to the job queue  */    
+} thpool_;
+
+
+
+
+
+/* ========================== PROTOTYPES ============================ */
+
+
+static void  thread_init(thpool_* thpool_p, struct thread** thread_p, int id);
+static void* thread_do(struct thread* thread_p);
+static void  thread_hold();
+static void  thread_destroy(struct thread* thread_p);
+
+static int   jobqueue_init(thpool_* thpool_p);
+static void  jobqueue_clear(thpool_* thpool_p);
+static void  jobqueue_push(thpool_* thpool_p, struct job* newjob_p);
+static struct job* jobqueue_pull(thpool_* thpool_p);
+static void  jobqueue_destroy(thpool_* thpool_p);
+
+static void  bsem_init(struct bsem *bsem_p, int value);
+static void  bsem_reset(struct bsem *bsem_p);
+static void  bsem_post(struct bsem *bsem_p);
+static void  bsem_post_all(struct bsem *bsem_p);
+static void  bsem_wait(struct bsem *bsem_p);
+
+
+
+
+
+/* ========================== THREADPOOL ============================ */
+
+
+/* Initialise thread pool */
+struct thpool_* thpool_init(int num_threads){
+
+	threads_on_hold   = 0;
+	threads_keepalive = 1;
+
+	if ( num_threads < 0){
+		num_threads = 0;
+	}
+
+	/* Make new thread pool */
+	thpool_* thpool_p;
+	thpool_p = (struct thpool_*)malloc(sizeof(struct thpool_));
+	if (thpool_p == NULL){
+		fprintf(stderr, "thpool_init(): Could not allocate memory for thread pool\n");
+		return NULL;
+	}
+	thpool_p->num_threads_alive   = 0;
+	thpool_p->num_threads_working = 0;
+
+	/* Initialise the job queue */
+	if (jobqueue_init(thpool_p) == -1){
+		fprintf(stderr, "thpool_init(): Could not allocate memory for job queue\n");
+		free(thpool_p);
+		return NULL;
+	}
+
+	/* Make threads in pool */
+	thpool_p->threads = (struct thread**)malloc(num_threads * sizeof(struct thread));
+	if (thpool_p->threads == NULL){
+		fprintf(stderr, "thpool_init(): Could not allocate memory for threads\n");
+		jobqueue_destroy(thpool_p);
+		free(thpool_p->jobqueue_p);
+		free(thpool_p);
+		return NULL;
+	}
+
+	pthread_mutex_init(&(thpool_p->thcount_lock), NULL);
+	
+	/* Thread init */
+	int n;
+	for (n=0; n<num_threads; n++){
+		thread_init(thpool_p, &thpool_p->threads[n], n);
+		if (THPOOL_DEBUG)
+			printf("THPOOL_DEBUG: Created thread %d in pool \n", n);
+	}
+	
+	/* Wait for threads to initialize */
+	while (thpool_p->num_threads_alive != num_threads) {}
+
+	return thpool_p;
+}
+
+
+/* Add work to the thread pool */
+int thpool_add_work(thpool_* thpool_p, void *(*function_p)(void*), void* arg_p){
+	job* newjob;
+
+	newjob=(struct job*)malloc(sizeof(struct job));
+	if (newjob==NULL){
+		fprintf(stderr, "thpool_add_work(): Could not allocate memory for new job\n");
+		return -1;
+	}
+
+	/* add function and argument */
+	newjob->function=function_p;
+	newjob->arg=arg_p;
+
+	/* add job to queue */
+	pthread_mutex_lock(&thpool_p->jobqueue_p->rwmutex);
+	jobqueue_push(thpool_p, newjob);
+	pthread_mutex_unlock(&thpool_p->jobqueue_p->rwmutex);
+
+	return 0;
+}
+
+
+/* Wait until all jobs have finished */
+void thpool_wait(thpool_* thpool_p){
+
+	/* Continuous polling */
+	double timeout = 1.0;
+	time_t start, end;
+	double tpassed = 0.0;
+	time (&start);
+	while (tpassed < timeout && 
+			(thpool_p->jobqueue_p->len || thpool_p->num_threads_working))
+	{
+		time (&end);
+		tpassed = difftime(end,start);
+	}
+
+	/* Exponential polling */
+	long init_nano =  1; /* MUST be above 0 */
+	long new_nano;
+	double multiplier =  1.01;
+	int  max_secs   = 20;
+	
+	struct timespec polling_interval;
+	polling_interval.tv_sec  = 0;
+	polling_interval.tv_nsec = init_nano;
+	
+	while (thpool_p->jobqueue_p->len || thpool_p->num_threads_working)
+	{
+		nanosleep(&polling_interval, NULL);
+		if ( polling_interval.tv_sec < max_secs ){
+			new_nano = CEIL(polling_interval.tv_nsec * multiplier);
+			polling_interval.tv_nsec = new_nano % MAX_NANOSEC;
+			if ( new_nano > MAX_NANOSEC ) {
+				polling_interval.tv_sec ++;
+			}
+		}
+		else break;
+	}
+	
+	/* Fall back to max polling */
+	while (thpool_p->jobqueue_p->len || thpool_p->num_threads_working){
+		sleep(max_secs);
+	}
+}
+
+
+/* Destroy the threadpool */
+void thpool_destroy(thpool_* thpool_p){
+	
+	volatile int threads_total = thpool_p->num_threads_alive;
+
+	/* End each thread 's infinite loop */
+	threads_keepalive = 0;
+	
+	/* Give one second to kill idle threads */
+	double TIMEOUT = 1.0;
+	time_t start, end;
+	double tpassed = 0.0;
+	time (&start);
+	while (tpassed < TIMEOUT && thpool_p->num_threads_alive){
+		bsem_post_all(thpool_p->jobqueue_p->has_jobs);
+		time (&end);
+		tpassed = difftime(end,start);
+	}
+	
+	/* Poll remaining threads */
+	while (thpool_p->num_threads_alive){
+		bsem_post_all(thpool_p->jobqueue_p->has_jobs);
+		sleep(1);
+	}
+
+	/* Job queue cleanup */
+	jobqueue_destroy(thpool_p);
+	free(thpool_p->jobqueue_p);
+	
+	/* Deallocs */
+	int n;
+	for (n=0; n < threads_total; n++){
+		thread_destroy(thpool_p->threads[n]);
+	}
+	free(thpool_p->threads);
+	free(thpool_p);
+}
+
+
+/* Pause all threads in threadpool */
+void thpool_pause(thpool_* thpool_p) {
+	int n;
+	for (n=0; n < thpool_p->num_threads_alive; n++){
+		pthread_kill(thpool_p->threads[n]->pthread, SIGUSR1);
+	}
+}
+
+
+/* Resume all threads in threadpool */
+void thpool_resume(thpool_* thpool_p) {
+	threads_on_hold = 0;
+}
+
+
+
+
+
+/* ============================ THREAD ============================== */
+
+
+/* Initialize a thread in the thread pool
+ * 
+ * @param thread        address to the pointer of the thread to be created
+ * @param id            id to be given to the thread
+ * 
+ */
+static void thread_init (thpool_* thpool_p, struct thread** thread_p, int id){
+	
+	*thread_p = (struct thread*)malloc(sizeof(struct thread));
+	if (thread_p == NULL){
+		fprintf(stderr, "thpool_init(): Could not allocate memory for thread\n");
+		exit(1);
+	}
+
+	(*thread_p)->thpool_p = thpool_p;
+	(*thread_p)->id       = id;
+
+	pthread_create(&(*thread_p)->pthread, NULL, (void *)thread_do, (*thread_p));
+	pthread_detach((*thread_p)->pthread);
+	
+}
+
+
+/* Sets the calling thread on hold */
+static void thread_hold () {
+	threads_on_hold = 1;
+	while (threads_on_hold){
+		sleep(1);
+	}
+}
+
+
+/* What each thread is doing
+* 
+* In principle this is an endless loop. The only time this loop gets interuppted is once
+* thpool_destroy() is invoked or the program exits.
+* 
+* @param  thread        thread that will run this function
+* @return nothing
+*/
+static void* thread_do(struct thread* thread_p){
+	/* Set thread name for profiling and debuging */
+	char thread_name[128] = {0};
+	sprintf(thread_name, "thread-pool-%d", thread_p->id);
+	prctl(PR_SET_NAME, thread_name);
+
+	/* Assure all threads have been created before starting serving */
+	thpool_* thpool_p = thread_p->thpool_p;
+	
+	/* Register signal handler */
+	struct sigaction act;
+	act.sa_handler = thread_hold;
+	if (sigaction(SIGUSR1, &act, NULL) == -1) {
+		fprintf(stderr, "thread_do(): cannot handle SIGUSR1");
+	}
+	
+	/* Mark thread as alive (initialized) */
+	pthread_mutex_lock(&thpool_p->thcount_lock);
+	thpool_p->num_threads_alive += 1;
+	pthread_mutex_unlock(&thpool_p->thcount_lock);
+
+	while(threads_keepalive){
+
+		bsem_wait(thpool_p->jobqueue_p->has_jobs);
+
+		if (threads_keepalive){
+			
+			pthread_mutex_lock(&thpool_p->thcount_lock);
+			thpool_p->num_threads_working++;
+			pthread_mutex_unlock(&thpool_p->thcount_lock);
+			
+			/* Read job from queue and execute it */
+			void*(*func_buff)(void* arg);
+			void*  arg_buff;
+			job* job_p;
+			pthread_mutex_lock(&thpool_p->jobqueue_p->rwmutex);
+			job_p = jobqueue_pull(thpool_p);
+			pthread_mutex_unlock(&thpool_p->jobqueue_p->rwmutex);
+			if (job_p) {
+				func_buff = job_p->function;
+				arg_buff  = job_p->arg;
+				func_buff(arg_buff);
+				free(job_p);
+			}
+			
+			pthread_mutex_lock(&thpool_p->thcount_lock);
+			thpool_p->num_threads_working--;
+			pthread_mutex_unlock(&thpool_p->thcount_lock);
+
+		}
+	}
+	pthread_mutex_lock(&thpool_p->thcount_lock);
+	thpool_p->num_threads_alive --;
+	pthread_mutex_unlock(&thpool_p->thcount_lock);
+
+	return NULL;
+}
+
+
+/* Frees a thread  */
+static void thread_destroy (thread* thread_p){
+	free(thread_p);
+}
+
+
+
+
+
+/* ============================ JOB QUEUE =========================== */
+
+
+/* Initialize queue */
+static int jobqueue_init(thpool_* thpool_p){
+	
+	thpool_p->jobqueue_p = (struct jobqueue*)malloc(sizeof(struct jobqueue));
+	if (thpool_p->jobqueue_p == NULL){
+		return -1;
+	}
+	thpool_p->jobqueue_p->len = 0;
+	thpool_p->jobqueue_p->front = NULL;
+	thpool_p->jobqueue_p->rear  = NULL;
+
+	thpool_p->jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem));
+	if (thpool_p->jobqueue_p->has_jobs == NULL){
+		return -1;
+	}
+
+	pthread_mutex_init(&(thpool_p->jobqueue_p->rwmutex), NULL);
+	bsem_init(thpool_p->jobqueue_p->has_jobs, 0);
+
+	return 0;
+}
+
+
+/* Clear the queue */
+static void jobqueue_clear(thpool_* thpool_p){
+
+	while(thpool_p->jobqueue_p->len){
+		free(jobqueue_pull(thpool_p));
+	}
+
+	thpool_p->jobqueue_p->front = NULL;
+	thpool_p->jobqueue_p->rear  = NULL;
+	bsem_reset(thpool_p->jobqueue_p->has_jobs);
+	thpool_p->jobqueue_p->len = 0;
+
+}
+
+
+/* Add (allocated) job to queue
+ *
+ * Notice: Caller MUST hold a mutex
+ */
+static void jobqueue_push(thpool_* thpool_p, struct job* newjob){
+
+	newjob->prev = NULL;
+
+	switch(thpool_p->jobqueue_p->len){
+
+		case 0:  /* if no jobs in queue */
+					thpool_p->jobqueue_p->front = newjob;
+					thpool_p->jobqueue_p->rear  = newjob;
+					break;
+
+		default: /* if jobs in queue */
+					thpool_p->jobqueue_p->rear->prev = newjob;
+					thpool_p->jobqueue_p->rear = newjob;
+					
+	}
+	thpool_p->jobqueue_p->len++;
+	
+	bsem_post(thpool_p->jobqueue_p->has_jobs);
+}
+
+
+/* Get first job from queue(removes it from queue)
+ * 
+ * Notice: Caller MUST hold a mutex
+ */
+static struct job* jobqueue_pull(thpool_* thpool_p){
+
+	job* job_p;
+	job_p = thpool_p->jobqueue_p->front;
+
+	switch(thpool_p->jobqueue_p->len){
+		
+		case 0:  /* if no jobs in queue */
+		  			break;
+		
+		case 1:  /* if one job in queue */
+					thpool_p->jobqueue_p->front = NULL;
+					thpool_p->jobqueue_p->rear  = NULL;
+					thpool_p->jobqueue_p->len = 0;
+					break;
+		
+		default: /* if >1 jobs in queue */
+					thpool_p->jobqueue_p->front = job_p->prev;
+					thpool_p->jobqueue_p->len--;
+					/* more than one job in queue -> post it */
+					bsem_post(thpool_p->jobqueue_p->has_jobs);
+					
+	}
+	
+	return job_p;
+}
+
+
+/* Free all queue resources back to the system */
+static void jobqueue_destroy(thpool_* thpool_p){
+	jobqueue_clear(thpool_p);
+	free(thpool_p->jobqueue_p->has_jobs);
+}
+
+
+
+
+
+/* ======================== SYNCHRONISATION ========================= */
+
+
+/* Init semaphore to 1 or 0 */
+static void bsem_init(bsem *bsem_p, int value) {
+	if (value < 0 || value > 1) {
+		fprintf(stderr, "bsem_init(): Binary semaphore can take only values 1 or 0");
+		exit(1);
+	}
+	pthread_mutex_init(&(bsem_p->mutex), NULL);
+	pthread_cond_init(&(bsem_p->cond), NULL);
+	bsem_p->v = value;
+}
+
+
+/* Reset semaphore to 0 */
+static void bsem_reset(bsem *bsem_p) {
+	bsem_init(bsem_p, 0);
+}
+
+
+/* Post to at least one thread */
+static void bsem_post(bsem *bsem_p) {
+	pthread_mutex_lock(&bsem_p->mutex);
+	bsem_p->v = 1;
+	pthread_cond_signal(&bsem_p->cond);
+	pthread_mutex_unlock(&bsem_p->mutex);
+}
+
+
+/* Post to all threads */
+static void bsem_post_all(bsem *bsem_p) {
+	pthread_mutex_lock(&bsem_p->mutex);
+	bsem_p->v = 1;
+	pthread_cond_broadcast(&bsem_p->cond);
+	pthread_mutex_unlock(&bsem_p->mutex);
+}
+
+
+/* Wait on semaphore until semaphore has value 0 */
+static void bsem_wait(bsem* bsem_p) {
+	pthread_mutex_lock(&bsem_p->mutex);
+	while (bsem_p->v != 1) {
+		pthread_cond_wait(&bsem_p->cond, &bsem_p->mutex);
+	}
+	bsem_p->v = 0;
+	pthread_mutex_unlock(&bsem_p->mutex);
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/1e73e4d1/utils/private/test/thread_pool_test.cpp
----------------------------------------------------------------------
diff --git a/utils/private/test/thread_pool_test.cpp b/utils/private/test/thread_pool_test.cpp
new file mode 100644
index 0000000..5dae4c8
--- /dev/null
+++ b/utils/private/test/thread_pool_test.cpp
@@ -0,0 +1,118 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * array_list_test.cpp
+ *
+ * 	\date       Sep 15, 2015
+ *  \author    	Menno van der Graaf & Alexander
+ *  \copyright	Apache License, Version 2.0
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+
+#include "CppUTest/TestHarness.h"
+#include "CppUTest/TestHarness_c.h"
+#include "CppUTest/CommandLineTestRunner.h"
+
+extern "C" {
+#include "celix_threads.h"
+#include "thpool.h"
+}
+
+celix_thread_mutex_t mutex;
+int sum=0;
+
+
+void * increment(void *) {
+	celixThreadMutex_lock(&mutex);
+	sum ++;
+	celixThreadMutex_unlock(&mutex);
+	return NULL;
+}
+
+int main(int argc, char** argv) {
+	return RUN_ALL_TESTS(argc, argv);
+}
+
+
+//----------------------TEST THREAD FUNCTION DECLARATIONS----------------------
+
+//----------------------TESTGROUP DEFINES----------------------
+
+TEST_GROUP(thread_pool) {
+	threadpool	myPool;
+
+	void setup(void) {
+	}
+
+	void teardown(void) {
+	}
+};
+
+
+//----------------------THREAD_POOL TESTS----------------------
+
+TEST(thread_pool, create) {
+
+	myPool = thpool_init(5);	// pool of 5 threads
+	CHECK((myPool != NULL));
+	thpool_destroy(myPool);
+}
+
+TEST(thread_pool, do_work) {
+
+	myPool = thpool_init(5);	// pool of 5 threads
+	celixThreadMutex_create(&mutex, NULL);
+	CHECK((myPool != NULL));
+	int n;
+	sum = 0;
+	int num_jobs = 1000;
+	for (n = 0; n < num_jobs; n++){
+		thpool_add_work(myPool, increment, NULL);
+	}
+	thpool_wait(myPool);
+	thpool_destroy(myPool);
+	CHECK_EQUAL(1000, sum);
+	celixThreadMutex_destroy(&mutex);
+}
+
+TEST(thread_pool, do_work_with_pause) {
+
+	myPool = thpool_init(5);	// pool of 5 threads
+	celixThreadMutex_create(&mutex, NULL);
+	CHECK((myPool != NULL));
+	int n;
+	sum = 0;
+	int num_jobs = 500000;
+	for (n = 0; n < num_jobs; n++){
+		thpool_add_work(myPool, increment, NULL);
+	}
+	sleep(1);
+	thpool_pause(myPool);
+	for (n = 0; n < num_jobs; n++){
+		thpool_add_work(myPool, increment, NULL);
+	}
+	thpool_resume(myPool);
+	thpool_wait(myPool);
+	thpool_destroy(myPool);
+	CHECK_EQUAL(1000000, sum);
+	celixThreadMutex_destroy(&mutex);
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/1e73e4d1/utils/public/docs/Design.md
----------------------------------------------------------------------
diff --git a/utils/public/docs/Design.md b/utils/public/docs/Design.md
new file mode 100644
index 0000000..00fe1b4
--- /dev/null
+++ b/utils/public/docs/Design.md
@@ -0,0 +1,52 @@
+## High level
+	
+	Description: Library providing a threading pool where you can add work on the fly. The number
+	             of threads in the pool is adjustable when creating the pool. In most cases
+	             this should equal the number of threads supported by your cpu.
+	         
+	             For an example on how to use the threadpool, check the main.c file or just
read
+	             the documentation found in the README.md file.
+	
+	             In this header file a detailed overview of the functions and the threadpool's
logical
+	             scheme is presented in case you wish to tweak or alter something. 
+	
+	
+	
+	              _______________________________________________________        
+	            /                                                       \
+	            |   JOB QUEUE        | job1 | job2 | job3 | job4 | ..   |
+	            |                                                       |
+	            |   threadpool      | thread1 | thread2 | ..            |
+	            \_______________________________________________________/
+	
+	
+	   Description:       Jobs are added to the job queue. Once a thread in the pool
+	                      is idle, it is assigned with the first job from the queue(and
+	                      erased from the queue). It's each thread's job to read from 
+	                      the queue serially(using lock) and executing each job
+	                      until the queue is empty.
+	
+	
+	   Scheme:
+	
+	   thpool______                jobqueue____                      ______ 
+	   |           |               |           |       .----------->|_job0_| Newly added
job
+	   |           |               |  rear  ----------'             |_job1_|
+	   | jobqueue----------------->|           |                    |_job2_|
+	   |           |               |  front ----------.             |__..__| 
+	   |___________|               |___________|       '----------->|_jobn_| Job for thread
to take
+	
+	
+	   job0________ 
+	   |           |
+	   | function---->
+	   |           |
+	   |   arg------->
+	   |           |         job1________ 
+	   |  next-------------->|           |
+	   |___________|         |           |..
+
+
+## Synchronisation
+
+*Mutexes* and *binary semaphores* are the main tools to achieve synchronisation between threads.

http://git-wip-us.apache.org/repos/asf/celix/blob/1e73e4d1/utils/public/docs/FAQ.md
----------------------------------------------------------------------
diff --git a/utils/public/docs/FAQ.md b/utils/public/docs/FAQ.md
new file mode 100644
index 0000000..584a699
--- /dev/null
+++ b/utils/public/docs/FAQ.md
@@ -0,0 +1,33 @@
+
+###Why isn't pthread_exit() used to exit a thread?
+`thread_do` used to use pthread_exit(). However that resulted in
+hard times of testing for memory leaks. The reason is that on pthread_exit()
+not all memory is freed bt pthread (probably for future threads or false
+belief that the application is terminating). For these reasons a simple return
+is used.
+
+Interestingly using `pthread_exit()` results in much more memory being allocated.
+
+
+###Why do you use sleep() after calling thpool_destroy()?
+This is needed only in the tests. The reason is that if you call thpool_destroy
+and then exit immedietely, maybe the program will exit before all the threads
+had the time to deallocate. In that way it is impossible to check for memory
+leaks.
+
+In production you don't have to worry about this since if you call exit,
+immedietely after you destroyied the pool, the threads will be freed
+anyway by the OS. If you eitherway destroy the pool in the middle of your
+program it doesn't matter again since the program will not exit immediately
+and thus threads will have more than enough time to terminate.
+
+
+
+###Why does wait() use all my CPU?
+Normally `wait()` will spike CPU usage to full when called. This is normal as long as it
doesn't last for more than 1 second. The reason this happens is that `wait()` goes through
various phases of polling (what is called smart polling).
+
+ * Initially there is no interval between polling and hence the 100% use of your CPU.
+ * After that the polling interval grows exponentially.
+ * Finally after x seconds, if there is still work, polling falls back to a very big interval.
+ 
+The reason `wait()` works in this way, is that the function is mostly used when someone wants
to wait for some calculation to finish. So if the calculation is assumed to take a long time
then we don't want to poll too often. Still we want to poll fast in case the calculation is
a simple one. To solve these two problems, this seemingly awkward behaviour is present.

http://git-wip-us.apache.org/repos/asf/celix/blob/1e73e4d1/utils/public/docs/README.md
----------------------------------------------------------------------
diff --git a/utils/public/docs/README.md b/utils/public/docs/README.md
new file mode 100644
index 0000000..0a07ebc
--- /dev/null
+++ b/utils/public/docs/README.md
@@ -0,0 +1,62 @@
+![Build status](http://178.62.170.124:3000/pithikos/c-thread-pool/badge/?branch=master)
+
+# C Thread Pool
+
+This is a minimal but fully functional threadpool implementation.
+
+  * ANCI C and POSIX compliant
+  * Number of threads can be chosen on initialization
+  * Minimal but powerful interface
+  * Full documentation
+
+The threadpool is under MIT license. Notice that this project took a considerable amount
of work and sacrifice of my free time and the reason I give it for free (even for commercial
use) is so when you become rich and wealthy you don't forget about us open-source creatures
of the night. Cheers!
+
+
+## v2 Changes
+
+This is an updated and heavily refactored version of my original threadpool. The main things
taken into consideration in this new version are:
+
+  * Synchronisation control from the user (pause/resume/wait)
+  * Thorough testing for memory leaks and race conditions
+  * Cleaner and more opaque API
+  * Smart polling - polling interval changes on-the-fly
+
+
+## Run an example
+
+The library is not precompiled so you have to compile it with your project. The thread pool
+uses POSIX threads so if you compile with gcc on Linux you have to use the flag `-pthread`
like this:
+
+    gcc example.c thpool.c -D THPOOL_DEBUG -pthread -o example
+
+
+Then run the executable like this:
+
+    ./example
+
+
+## Basic usage
+
+1. Include the header in your source file: `#include "thpool.h"`
+2. Create a thread pool with number of threads you want: `threadpool thpool = thpool_init(4);`
+3. Add work to the pool: `thpool_add_work(thpool, (void*)function_p, (void*)arg_p);`
+
+The workers(threads) will start their work automatically as fast as there is new work
+in the pool. If you want to wait for all added work to be finished before continuing
+you can use `thpool_wait(thpool);`. If you want to destroy the pool you can use
+`thpool_destroy(thpool);`.
+
+
+
+## API
+
+For a deeper look into the documentation check in the [thpool.h](https://github.com/Pithikos/C-Thread-Pool/blob/master/thpool.h)
file. Below is a fast practical overview.
+
+| Function example                | Description                                         
               |
+|---------------------------------|---------------------------------------------------------------------|
+| ***thpool_init(4)***            | Will return a new threadpool with `4` threads.      
                 |
+| ***thpool_add_work(thpool, (void&#42;)function_p, (void&#42;)arg_p)*** | Will add
new work to the pool. Work is simply a function. You can pass a single argument to the function
if you wish. If not, `NULL` should be passed. |
+| ***thpool_wait(thpool)***       | Will wait for all jobs (both in queue and currently running)
to finish. |
+| ***thpool_destroy(thpool)***    | This will destroy the threadpool. If jobs are currently
being executed, then it will wait for them to finish. |
+| ***thpool_pause(thpool)***      | All threads in the threadpool will pause no matter if
they are idle or executing work. |
+| ***thpool_resume(thpool)***      | If the threadpool is paused, then all threads will resume
from where they were.   |

http://git-wip-us.apache.org/repos/asf/celix/blob/1e73e4d1/utils/public/include/thpool.h
----------------------------------------------------------------------
diff --git a/utils/public/include/thpool.h b/utils/public/include/thpool.h
new file mode 100644
index 0000000..ab3063b
--- /dev/null
+++ b/utils/public/include/thpool.h
@@ -0,0 +1,164 @@
+/********************************** 
+ * @author      Johan Hanssen Seferidis
+ * License:     MIT
+ * 
+ **********************************/
+
+#ifndef _THPOOL_
+#define _THPOOL_
+
+
+
+
+
+/* =================================== API ======================================= */
+
+
+typedef struct thpool_* threadpool;
+
+
+/**
+ * @brief  Initialize threadpool
+ * 
+ * Initializes a threadpool. This function will not return untill all
+ * threads have initialized successfully.
+ * 
+ * @example
+ * 
+ *    ..
+ *    threadpool thpool;                     //First we declare a threadpool
+ *    thpool = thpool_init(4);               //then we initialize it to 4 threads
+ *    ..
+ * 
+ * @param  num_threads   number of threads to be created in the threadpool
+ * @return threadpool    created threadpool on success,
+ *                       NULL on error
+ */
+threadpool thpool_init(int num_threads);
+
+
+/**
+ * @brief Add work to the job queue
+ * 
+ * Takes an action and its argument and adds it to the threadpool's job queue.
+ * If you want to add to work a function with more than one arguments then
+ * a way to implement this is by passing a pointer to a structure.
+ * 
+ * NOTICE: You have to cast both the function and argument to not get warnings.
+ * 
+ * @example
+ * 
+ *    void print_num(int num){
+ *       printf("%d\n", num);
+ *    }
+ * 
+ *    int main() {
+ *       ..
+ *       int a = 10;
+ *       thpool_add_work(thpool, (void*)print_num, (void*)a);
+ *       ..
+ *    }
+ * 
+ * @param  threadpool    threadpool to which the work will be added
+ * @param  function_p    pointer to function to add as work
+ * @param  arg_p         pointer to an argument
+ * @return nothing
+ */
+int thpool_add_work(threadpool, void *(*function_p)(void*), void* arg_p);
+
+
+/**
+ * @brief Wait for all queued jobs to finish
+ * 
+ * Will wait for all jobs - both queued and currently running to finish.
+ * Once the queue is empty and all work has completed, the calling thread
+ * (probably the main program) will continue.
+ * 
+ * Smart polling is used in wait. The polling is initially 0 - meaning that
+ * there is virtually no polling at all. If after 1 seconds the threads
+ * haven't finished, the polling interval starts growing exponentially 
+ * untill it reaches max_secs seconds. Then it jumps down to a maximum polling
+ * interval assuming that heavy processing is being used in the threadpool.
+ *
+ * @example
+ * 
+ *    ..
+ *    threadpool thpool = thpool_init(4);
+ *    ..
+ *    // Add a bunch of work
+ *    ..
+ *    thpool_wait(thpool);
+ *    puts("All added work has finished");
+ *    ..
+ * 
+ * @param threadpool     the threadpool to wait for
+ * @return nothing
+ */
+void thpool_wait(threadpool);
+
+
+/**
+ * @brief Pauses all threads immediately
+ * 
+ * The threads will be paused no matter if they are idle or working.
+ * The threads return to their previous states once thpool_resume
+ * is called.
+ * 
+ * While the thread is being paused, new work can be added.
+ * 
+ * @example
+ * 
+ *    threadpool thpool = thpool_init(4);
+ *    thpool_pause(thpool);
+ *    ..
+ *    // Add a bunch of work
+ *    ..
+ *    thpool_resume(thpool); // Let the threads start their magic
+ * 
+ * @param threadpool    the threadpool where the threads should be paused
+ * @return nothing
+ */
+void thpool_pause(threadpool);
+
+
+/**
+ * @brief Unpauses all threads if they are paused
+ * 
+ * @example
+ *    ..
+ *    thpool_pause(thpool);
+ *    sleep(10);              // Delay execution 10 seconds
+ *    thpool_resume(thpool);
+ *    ..
+ * 
+ * @param threadpool     the threadpool where the threads should be unpaused
+ * @return nothing
+ */
+void thpool_resume(threadpool);
+
+
+/**
+ * @brief Destroy the threadpool
+ * 
+ * This will wait for the currently active threads to finish and then 'kill'
+ * the whole threadpool to free up memory.
+ * 
+ * @example
+ * int main() {
+ *    threadpool thpool1 = thpool_init(2);
+ *    threadpool thpool2 = thpool_init(2);
+ *    ..
+ *    thpool_destroy(thpool1);
+ *    ..
+ *    return 0;
+ * }
+ * 
+ * @param threadpool     the threadpool to destroy
+ * @return nothing
+ */
+void thpool_destroy(threadpool);
+
+
+
+
+#endif


Mime
View raw message