Return-Path: Delivered-To: apmail-apr-dev-archive@www.apache.org Received: (qmail 55364 invoked from network); 18 Oct 2006 22:55:52 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 18 Oct 2006 22:55:52 -0000 Received: (qmail 69018 invoked by uid 500); 18 Oct 2006 22:55:51 -0000 Delivered-To: apmail-apr-dev-archive@apr.apache.org Received: (qmail 68737 invoked by uid 500); 18 Oct 2006 22:55:50 -0000 Mailing-List: contact dev-help@apr.apache.org; run by ezmlm Precedence: bulk List-Post: List-Help: List-Unsubscribe: List-Id: Delivered-To: mailing list dev@apr.apache.org Received: (qmail 68726 invoked by uid 99); 18 Oct 2006 22:55:50 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Oct 2006 15:55:50 -0700 X-ASF-Spam-Status: No, hits=0.0 required=10.0 tests=UNPARSEABLE_RELAY X-Spam-Check-By: apache.org Received-SPF: neutral (asf.osuosl.org: local policy) Received: from [192.18.42.249] (HELO nwk-ea-fw-1.sun.com) (192.18.42.249) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Oct 2006 15:55:46 -0700 Received: from d1-sfbay-09.sun.com ([192.18.39.119]) by nwk-ea-fw-1.sun.com (8.13.6+Sun/8.12.9) with ESMTP id k9IMt33Z013596 for ; Wed, 18 Oct 2006 15:55:24 -0700 (PDT) Received: from conversion-daemon.d1-sfbay-09.sun.com by d1-sfbay-09.sun.com (Sun Java System Messaging Server 6.2-6.01 (built Apr 3 2006)) id <0J7C00101SZP1900@d1-sfbay-09.sun.com> (original mail from henryjen@ztune.net) for dev@apr.apache.org; Wed, 18 Oct 2006 15:55:03 -0700 (PDT) Received: from [192.168.123.2] ([24.6.11.155]) by d1-sfbay-09.sun.com (Sun Java System Messaging Server 6.2-6.01 (built Apr 3 2006)) with ESMTPSA id <0J7C001C9SZPDRX5@d1-sfbay-09.sun.com>; Wed, 18 Oct 2006 15:55:03 -0700 (PDT) Date: Wed, 18 Oct 2006 15:54:51 -0700 From: Henry Jen Subject: Re: [PATCH] apr_threadpool In-reply-to: <452AE47C.4060603@ztune.net> Sender: Henry.Jen@Sun.COM To: Henry Jen Cc: APR Development List Message-id: <4536B0BB.5010702@ztune.net> MIME-version: 1.0 Content-type: multipart/mixed; boundary="Boundary_(ID_0U6LhK63oMjE8ZDD9rRhYA)" References: <452AE47C.4060603@ztune.net> User-Agent: Thunderbird 1.5.0.7 (X11/20060915) X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N This is a multi-part message in MIME format. --Boundary_(ID_0U6LhK63oMjE8ZDD9rRhYA) Content-type: text/plain; format=flowed; charset=ISO-8859-1 Content-transfer-encoding: 7BIT Henry Jen wrote: > Hi, > > Attached please find the patch for thread pool implementation, looking > forward to see it get committed. > I just realized that I sent the wrong patch, which did not drop the copyright notice. Attached is the correct patch. :-) Just want to make sure the consensus is the code is ready for commit and is now simply waiting some committer's love. Cheers, Henry --Boundary_(ID_0U6LhK63oMjE8ZDD9rRhYA) Content-type: text/x-patch; name=apr_thread_pool.diff Content-transfer-encoding: 7BIT Content-disposition: inline; filename=apr_thread_pool.diff Index: aprutil.dsp =================================================================== --- aprutil.dsp (revision 453014) +++ aprutil.dsp (working copy) @@ -240,6 +240,10 @@ # PROP Default_Filter "" # Begin Source File +SOURCE=.\misc\apr_thread_pool.c +# End Source File +# Begin Source File + SOURCE=.\misc\apr_date.c # End Source File # Begin Source File @@ -512,6 +516,10 @@ # End Source File # Begin Source File +SOURCE=.\include\apr_thread_pool.h +# End Source File +# Begin Source File + SOURCE=.\include\apr_date.h # End Source File # Begin Source File Index: include/apr_thread_pool.h =================================================================== --- include/apr_thread_pool.h (revision 0) +++ include/apr_thread_pool.h (revision 0) @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +#ifndef APR_THREAD_POOL_H +#define APR_THREAD_POOL_H + +#include "apr.h" +#include "apr_thread_proc.h" + +/** + * @file apr_thread_pool.h + * @brief APR Thread Pool Library + + * @remarks This library implements a thread pool using apr_thread_t. A thread + * pool is a set of threads that can be created in advance or on demand until a + * maximum number. When a task is scheduled, the thread pool will find an idle + * thread to handle the task. In case all existing threads are busy and the + * number of tasks in the queue is higher than the adjustable threshold, the + * pool will try to create a new thread to serve the task if the maximum number + * has not been reached. Otherwise, the task will be put into a queue based on + * priority, which can be valued from 0 to 255, with higher value been served + * first. In case there are tasks with the same priority, the new task is put at + * the top or the bottom depeneds on which function is used to put the task. + * + * @remarks There may be the case that a thread pool can use up the maximum + * number of threads at peak load, but having those threads idle afterwards. A + * maximum number of idle threads can be set so that extra idling threads will + * be terminated to save system resrouces. + */ +#if APR_HAS_THREADS + +#ifdef __cplusplus +extern "C" +{ +#if 0 +}; +#endif +#endif /* __cplusplus */ + +/** Opaque Thread Pool structure. */ +typedef struct apr_thread_pool apr_thread_pool_t; + +#define APR_THREAD_TASK_PRIORITY_LOWEST 0 +#define APR_THREAD_TASK_PRIORITY_LOW 63 +#define APR_THREAD_TASK_PRIORITY_NORMAL 127 +#define APR_THREAD_TASK_PRIORITY_HIGH 191 +#define APR_THREAD_TASK_PRIORITY_HIGHEST 255 + +/** + * Create a thread pool + * @param me A pointer points to the pointer receives the created + * apr_thread_pool object. The returned value will be NULL if failed to create + * the thread pool. + * @param init_threads The number of threads to be created initially, the number + * will also be used as the initial value for maximum number of idle threads. + * @param max_threads The maximum number of threads that can be created + * @param pool The pool to use + * @return APR_SUCCESS if the thread pool was created successfully. Otherwise, + * the error code. + */ +APR_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me, + apr_size_t init_threads, + apr_size_t max_threads, + apr_pool_t * pool); + +/** + * Destroy the thread pool and stop all the threads + * @return APR_SUCCESS if all threads are stopped. + */ +APR_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me); + +/** + * Schedule a task to the bottom of the tasks of same priority. + * @param me The thread pool + * @param func The task function + * @param param The parameter for the task function + * @param priority The priority of the task. + * @param owner Owner of this task. + * @return APR_SUCCESS if the task had been scheduled successfully + */ +APR_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t * me, + apr_thread_start_t func, + void *param, + apr_byte_t priority, + void *owner); +/** + * Schedule a task to be run after a delay + * @param me The thread pool + * @param func The task function + * @param param The parameter for the task function + * @param time Time in microseconds + * @param owner Owner of this task. + * @return APR_SUCCESS if the task had been scheduled successfully + */ +APR_DECLARE(apr_status_t) apr_thread_pool_schedule(apr_thread_pool_t * me, + apr_thread_start_t func, + void *param, + apr_interval_time_t time, + void *owner); + +/** + * Schedule a task to the top of the tasks of same priority. + * @param me The thread pool + * @param func The task function + * @param param The parameter for the task function + * @param priority The priority of the task. + * @param owner Owner of this task. + * @return APR_SUCCESS if the task had been scheduled successfully + */ +APR_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t * me, + apr_thread_start_t func, + void *param, + apr_byte_t priority, + void *owner); + +/** + * Cancel tasks submitted by the owner. If there is any task from the owner is + * currently under process, the function will spin until the task finished. + * @param me The thread pool + * @param owner Owner of the task + * @return APR_SUCCESS if the task has been cancelled successfully + * @note The task function should not be calling cancel, otherwise the function + * may get stuck forever. The function assert if it detect such a case. + */ +APR_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t * me, + void *owner); + +/** + * Get current number of tasks waiting in the queue + * @param me The thread pool + * @return Number of tasks in the queue + */ +APR_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t * me); + +/** + * Get current number of scheduled tasks waiting in the queue + * @param me The thread pool + * @return Number of scheduled tasks in the queue + */ +APR_DECLARE(apr_size_t) + apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t * me); + +/** + * Get current number of threads + * @param me The thread pool + * @return Number of total threads + */ +APR_DECLARE(apr_size_t) apr_thread_pool_threads_count(apr_thread_pool_t * me); + +/** + * Get current number of busy threads + * @param me The thread pool + * @return Number of busy threads + */ +APR_DECLARE(apr_size_t) apr_thread_pool_busy_count(apr_thread_pool_t * me); + +/** + * Get current number of idling thread + * @param me The thread pool + * @return Number of idling threads + */ +APR_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t * me); + +/** + * Access function for the maximum number of idling thread. Number of current + * idle threads will be reduced to the new limit. + * @param me The thread pool + * @param cnt The number + * @return The number of threads were stopped. + */ +APR_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t * me, + apr_size_t cnt); + +/** + * Access function for the maximum number of idling thread + * @param me The thread pool + * @return The current maximum number + */ +APR_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t * me); + +/** + * Access function for the maximum number of thread. + * @param me The thread pool + * @param cnt The number + * @return The original maximum number of threads + */ +APR_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t * me, + apr_size_t cnt); + +/** + * Access function for the maximum number of threads + * @param me The thread pool + * @return The current maximum number + */ +APR_DECLARE(apr_size_t) apr_thread_pool_thread_max_get(apr_thread_pool_t * + me); + +/** + * Access function for the threshold of tasks in queue to trigger a new thread. + * @param me The thread pool + * @param cnt The new threshold + * @return The original threshold + */ +APR_DECLARE(apr_size_t) apr_thread_pool_threshold_set(apr_thread_pool_t * me, + apr_size_t val); + +/** + * Access function for the threshold of tasks in queue to trigger a new thread. + * @param me The thread pool + * @return The current threshold + */ +APR_DECLARE(apr_size_t) apr_thread_pool_threshold_get(apr_thread_pool_t * me); + +#ifdef __cplusplus +#if 0 +{ +#endif +} +#endif + +#endif /* APR_HAS_THREADS */ + +#endif /* APR_THREAD_POOL_H */ + +/* vim: set ts=4 sw=4 et cin tw=80: */ Index: misc/apr_thread_pool.c =================================================================== --- misc/apr_thread_pool.c (revision 0) +++ misc/apr_thread_pool.c (revision 0) @@ -0,0 +1,817 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +#include +#include "apr_thread_pool.h" +#include "apr_ring.h" +#include "apr_thread_cond.h" +#include "apr_portable.h" + +#if APR_HAS_THREADS + +#define TASK_PRIORITY_SEGS 4 +#define TASK_PRIORITY_SEG(x) (((x)->dispatch.priority & 0xFF) / 64) + +typedef struct apr_thread_pool_task +{ + APR_RING_ENTRY(apr_thread_pool_task) link; + apr_thread_start_t func; + void *param; + void *owner; + union + { + apr_byte_t priority; + apr_time_t time; + } dispatch; +} apr_thread_pool_task_t; + +APR_RING_HEAD(apr_thread_pool_tasks, apr_thread_pool_task); + +struct apr_thread_list_elt +{ + APR_RING_ENTRY(apr_thread_list_elt) link; + apr_thread_t *thd; + volatile void *current_owner; + volatile int stop; +}; + +APR_RING_HEAD(apr_thread_list, apr_thread_list_elt); + +struct apr_thread_pool +{ + apr_pool_t *pool; + volatile apr_size_t thd_max; + volatile apr_size_t idle_max; + volatile apr_size_t thd_cnt; + volatile apr_size_t idle_cnt; + volatile apr_size_t task_cnt; + volatile apr_size_t scheduled_task_cnt; + volatile apr_size_t threshold; + struct apr_thread_pool_tasks *tasks; + struct apr_thread_pool_tasks *scheduled_tasks; + struct apr_thread_list *busy_thds; + struct apr_thread_list *idle_thds; + apr_thread_mutex_t *lock; + apr_thread_mutex_t *cond_lock; + apr_thread_cond_t *cond; + volatile int terminated; + struct apr_thread_pool_tasks *recycled_tasks; + apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS]; +}; + +static apr_status_t thread_pool_construct(apr_thread_pool_t * me, + apr_size_t init_threads, + apr_size_t max_threads) +{ + apr_status_t rv; + int i; + + me->thd_max = max_threads; + me->idle_max = init_threads; + me->threshold = init_threads / 2; + rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED, + me->pool); + if (APR_SUCCESS != rv) { + return rv; + } + rv = apr_thread_mutex_create(&me->cond_lock, APR_THREAD_MUTEX_UNNESTED, + me->pool); + if (APR_SUCCESS != rv) { + apr_thread_mutex_destroy(me->lock); + return rv; + } + rv = apr_thread_cond_create(&me->cond, me->pool); + if (APR_SUCCESS != rv) { + apr_thread_mutex_destroy(me->lock); + apr_thread_mutex_destroy(me->cond_lock); + return rv; + } + me->tasks = apr_palloc(me->pool, sizeof(*me->tasks)); + if (!me->tasks) { + goto CATCH_ENOMEM; + } + APR_RING_INIT(me->tasks, apr_thread_pool_task, link); + me->scheduled_tasks = apr_palloc(me->pool, sizeof(*me->scheduled_tasks)); + if (!me->scheduled_tasks) { + goto CATCH_ENOMEM; + } + APR_RING_INIT(me->scheduled_tasks, apr_thread_pool_task, link); + me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks)); + if (!me->recycled_tasks) { + goto CATCH_ENOMEM; + } + APR_RING_INIT(me->recycled_tasks, apr_thread_pool_task, link); + me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds)); + if (!me->busy_thds) { + goto CATCH_ENOMEM; + } + APR_RING_INIT(me->busy_thds, apr_thread_list_elt, link); + me->idle_thds = apr_palloc(me->pool, sizeof(*me->idle_thds)); + if (!me->idle_thds) { + goto CATCH_ENOMEM; + } + APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link); + me->thd_cnt = me->idle_cnt = me->task_cnt = me->scheduled_task_cnt = 0; + me->terminated = 0; + for (i = 0; i < TASK_PRIORITY_SEGS; i++) { + me->task_idx[i] = NULL; + } + goto FINAL_EXIT; + CATCH_ENOMEM: + rv = APR_ENOMEM; + apr_thread_mutex_destroy(me->lock); + apr_thread_mutex_destroy(me->cond_lock); + apr_thread_cond_destroy(me->cond); + FINAL_EXIT: + return rv; +} + +/* + * NOTE: This function is not thread safe by itself. Caller should hold the lock + */ +static apr_thread_pool_task_t *pop_task(apr_thread_pool_t * me) +{ + apr_thread_pool_task_t *task = NULL; + int seg; + + /* check for scheduled tasks */ + if (me->scheduled_task_cnt > 0) { + task = APR_RING_FIRST(me->scheduled_tasks); + assert(task != NULL); + assert(task != + APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, + link)); + /* if it's time */ + if (task->dispatch.time <= apr_time_now()) { + --me->scheduled_task_cnt; + APR_RING_REMOVE(task, link); + return task; + } + } + /* check for normal tasks if we're not returning a scheduled task */ + if (me->task_cnt == 0) { + return NULL; + } + + task = APR_RING_FIRST(me->tasks); + assert(task != NULL); + assert(task != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)); + --me->task_cnt; + seg = TASK_PRIORITY_SEG(task); + if (task == me->task_idx[seg]) { + me->task_idx[seg] = APR_RING_NEXT(task, link); + if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks, + apr_thread_pool_task, link) + || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) { + me->task_idx[seg] = NULL; + } + } + APR_RING_REMOVE(task, link); + return task; +} + +static apr_interval_time_t waiting_time(apr_thread_pool_t * me) +{ + apr_thread_pool_task_t *task = NULL; + + task = APR_RING_FIRST(me->scheduled_tasks); + assert(task != NULL); + assert(task != + APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, + link)); + return task->dispatch.time - apr_time_now(); +} + +/* + * The worker thread function. Take a task from the queue and perform it if + * there is any. Otherwise, put itself into the idle thread list and waiting + * for signal to wake up. + * The thread terminate directly by detach and exit when it is asked to stop + * after finishing a task. Otherwise, the thread should be in idle thread list + * and should be joined. + */ +static void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param) +{ + apr_status_t rv = APR_SUCCESS; + apr_thread_pool_t *me = param; + apr_thread_pool_task_t *task = NULL; + apr_interval_time_t wait; + struct apr_thread_list_elt *elt; + + elt = apr_pcalloc(me->pool, sizeof(*elt)); + if (!elt) { + apr_thread_exit(t, APR_ENOMEM); + } + APR_RING_ELEM_INIT(elt, link); + elt->thd = t; + elt->stop = 0; + + apr_thread_mutex_lock(me->lock); + while (!me->terminated && !elt->stop) { + /* if not new element, it is awakened from idle */ + if (APR_RING_NEXT(elt, link) != elt) { + --me->idle_cnt; + APR_RING_REMOVE(elt, link); + } + APR_RING_INSERT_TAIL(me->busy_thds, elt, apr_thread_list_elt, link); + task = pop_task(me); + while (NULL != task && !me->terminated) { + elt->current_owner = task->owner; + apr_thread_mutex_unlock(me->lock); + task->func(t, task->param); + apr_thread_mutex_lock(me->lock); + APR_RING_INSERT_TAIL(me->recycled_tasks, task, + apr_thread_pool_task, link); + elt->current_owner = NULL; + if (elt->stop) { + break; + } + task = pop_task(me); + } + assert(NULL == elt->current_owner); + APR_RING_REMOVE(elt, link); + + /* busy thread been asked to stop, not joinable */ + if ((me->idle_cnt >= me->idle_max + && !(me->scheduled_task_cnt && 0 >= me->idle_max)) + || me->terminated || elt->stop) { + --me->thd_cnt; + apr_thread_mutex_unlock(me->lock); + apr_thread_detach(t); + apr_thread_exit(t, APR_SUCCESS); + return NULL; /* should not be here, safe net */ + } + + ++me->idle_cnt; + APR_RING_INSERT_TAIL(me->idle_thds, elt, apr_thread_list_elt, link); + wait = (me->scheduled_task_cnt) ? waiting_time(me) : -1; + apr_thread_mutex_unlock(me->lock); + apr_thread_mutex_lock(me->cond_lock); + if (wait >= 0) { + rv = apr_thread_cond_timedwait(me->cond, me->cond_lock, wait); + } + else { + rv = apr_thread_cond_wait(me->cond, me->cond_lock); + } + apr_thread_mutex_unlock(me->cond_lock); + apr_thread_mutex_lock(me->lock); + } + + /* idle thread been asked to stop, will be joined */ + --me->thd_cnt; + apr_thread_mutex_unlock(me->lock); + apr_thread_exit(t, APR_SUCCESS); + return NULL; /* should not be here, safe net */ +} + +static apr_status_t thread_pool_cleanup(void *me) +{ + apr_thread_pool_t *_self = me; + + _self->terminated = 1; + apr_thread_pool_idle_max_set(_self, 0); + while (_self->thd_cnt) { + apr_sleep(20 * 1000); /* spin lock with 20 ms */ + } + apr_thread_mutex_destroy(_self->lock); + apr_thread_mutex_destroy(_self->cond_lock); + apr_thread_cond_destroy(_self->cond); + return APR_SUCCESS; +} + +APR_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me, + apr_size_t init_threads, + apr_size_t max_threads, + apr_pool_t * pool) +{ + apr_thread_t *t; + apr_status_t rv = APR_SUCCESS; + + *me = apr_pcalloc(pool, sizeof(**me)); + if (!*me) { + return APR_ENOMEM; + } + + (*me)->pool = pool; + + rv = thread_pool_construct(*me, init_threads, max_threads); + if (APR_SUCCESS != rv) { + *me = NULL; + return rv; + } + apr_pool_cleanup_register(pool, *me, thread_pool_cleanup, + apr_pool_cleanup_null); + + while (init_threads) { + rv = apr_thread_create(&t, NULL, thread_pool_func, *me, (*me)->pool); + if (APR_SUCCESS != rv) { + break; + } + ++(*me)->thd_cnt; + --init_threads; + } + + return rv; +} + +APR_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me) +{ + return apr_pool_cleanup_run(me->pool, me, thread_pool_cleanup); +} + +/* + * NOTE: This function is not thread safe by itself. Caller should hold the lock + */ +static apr_thread_pool_task_t *task_new(apr_thread_pool_t * me, + apr_thread_start_t func, + void *param, apr_byte_t priority, + void *owner, apr_time_t time) +{ + apr_thread_pool_task_t *t; + + if (APR_RING_EMPTY(me->recycled_tasks, apr_thread_pool_task, link)) { + t = apr_pcalloc(me->pool, sizeof(*t)); + if (NULL == t) { + return NULL; + } + } + else { + t = APR_RING_FIRST(me->recycled_tasks); + APR_RING_REMOVE(t, link); + } + + APR_RING_ELEM_INIT(t, link); + t->func = func; + t->param = param; + t->owner = owner; + if (time > 0) { + t->dispatch.time = apr_time_now() + time; + } + else { + t->dispatch.priority = priority; + } + return t; +} + +/* + * Test it the task is the only one within the priority segment. + * If it is not, return the first element with same or lower priority. + * Otherwise, add the task into the queue and return NULL. + * + * NOTE: This function is not thread safe by itself. Caller should hold the lock + */ +static apr_thread_pool_task_t *add_if_empty(apr_thread_pool_t * me, + apr_thread_pool_task_t * const t) +{ + int seg; + int next; + apr_thread_pool_task_t *t_next; + + seg = TASK_PRIORITY_SEG(t); + if (me->task_idx[seg]) { + assert(APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) != + me->task_idx[seg]); + t_next = me->task_idx[seg]; + while (t_next->dispatch.priority > t->dispatch.priority) { + t_next = APR_RING_NEXT(t_next, link); + if (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) == + t_next) { + return t_next; + } + } + return t_next; + } + + for (next = seg - 1; next >= 0; next--) { + if (me->task_idx[next]) { + APR_RING_INSERT_BEFORE(me->task_idx[next], t, link); + break; + } + } + if (0 > next) { + APR_RING_INSERT_TAIL(me->tasks, t, apr_thread_pool_task, link); + } + me->task_idx[seg] = t; + return NULL; +} + +/* +* schedule a task to run in "time" milliseconds. Find the spot in the ring where +* the time fits. Adjust the short_time so the thread wakes up when the time is reached. +*/ +static apr_status_t schedule_task(apr_thread_pool_t * me, + apr_thread_start_t func, void *param, + void *owner, apr_interval_time_t time) +{ + apr_thread_pool_task_t *t; + apr_thread_pool_task_t *t_loc; + apr_thread_t *thd; + apr_status_t rv = APR_SUCCESS; + apr_thread_mutex_lock(me->lock); + + t = task_new(me, func, param, 0, owner, time); + if (NULL == t) { + apr_thread_mutex_unlock(me->lock); + return APR_ENOMEM; + } + t_loc = APR_RING_FIRST(me->scheduled_tasks); + while (NULL != t_loc) { + /* if the time is less than the entry insert ahead of it */ + if (t->dispatch.time < t_loc->dispatch.time) { + ++me->scheduled_task_cnt; + APR_RING_INSERT_BEFORE(t_loc, t, link); + break; + } + else { + t_loc = APR_RING_NEXT(t_loc, link); + if (t_loc == + APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, + link)) { + ++me->scheduled_task_cnt; + APR_RING_INSERT_TAIL(me->scheduled_tasks, t, + apr_thread_pool_task, link); + break; + } + } + } + /* there should be at least one thread for scheduled tasks */ + if (0 == me->thd_cnt) { + rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool); + if (APR_SUCCESS == rv) { + ++me->thd_cnt; + } + } + apr_thread_mutex_unlock(me->lock); + apr_thread_mutex_lock(me->cond_lock); + apr_thread_cond_signal(me->cond); + apr_thread_mutex_unlock(me->cond_lock); + return rv; +} + +static apr_status_t add_task(apr_thread_pool_t * me, apr_thread_start_t func, + void *param, apr_byte_t priority, int push, + void *owner) +{ + apr_thread_pool_task_t *t; + apr_thread_pool_task_t *t_loc; + apr_thread_t *thd; + apr_status_t rv = APR_SUCCESS; + + apr_thread_mutex_lock(me->lock); + + t = task_new(me, func, param, priority, owner, 0); + if (NULL == t) { + apr_thread_mutex_unlock(me->lock); + return APR_ENOMEM; + } + + t_loc = add_if_empty(me, t); + if (NULL == t_loc) { + goto FINAL_EXIT; + } + + if (push) { + while (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) != + t_loc && t_loc->dispatch.priority >= t->dispatch.priority) { + t_loc = APR_RING_NEXT(t_loc, link); + } + } + APR_RING_INSERT_BEFORE(t_loc, t, link); + if (!push) { + if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) { + me->task_idx[TASK_PRIORITY_SEG(t)] = t; + } + } + + FINAL_EXIT: + me->task_cnt++; + if (0 == me->thd_cnt || (0 == me->idle_cnt && me->thd_cnt < me->thd_max && + me->task_cnt > me->threshold)) { + rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool); + if (APR_SUCCESS == rv) { + ++me->thd_cnt; + } + } + apr_thread_mutex_unlock(me->lock); + + apr_thread_mutex_lock(me->cond_lock); + apr_thread_cond_signal(me->cond); + apr_thread_mutex_unlock(me->cond_lock); + + return rv; +} + +APR_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t * me, + apr_thread_start_t func, + void *param, + apr_byte_t priority, + void *owner) +{ + return add_task(me, func, param, priority, 1, owner); +} + +APR_DECLARE(apr_status_t) apr_thread_pool_schedule(apr_thread_pool_t * me, + apr_thread_start_t func, + void *param, + apr_interval_time_t time, + void *owner) +{ + return schedule_task(me, func, param, owner, time); +} + +APR_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t * me, + apr_thread_start_t func, + void *param, + apr_byte_t priority, + void *owner) +{ + return add_task(me, func, param, priority, 0, owner); +} + +static apr_status_t remove_scheduled_tasks(apr_thread_pool_t * me, + void *owner) +{ + apr_thread_pool_task_t *t_loc; + apr_thread_pool_task_t *next; + + t_loc = APR_RING_FIRST(me->scheduled_tasks); + while (t_loc != + APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, + link)) { + next = APR_RING_NEXT(t_loc, link); + /* if this is the owner remove it */ + if (t_loc->owner == owner) { + --me->scheduled_task_cnt; + APR_RING_REMOVE(t_loc, link); + } + t_loc = next; + } + return APR_SUCCESS; +} + +static apr_status_t remove_tasks(apr_thread_pool_t * me, void *owner) +{ + apr_thread_pool_task_t *t_loc; + apr_thread_pool_task_t *next; + int seg; + + t_loc = APR_RING_FIRST(me->tasks); + while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) { + next = APR_RING_NEXT(t_loc, link); + if (t_loc->owner == owner) { + --me->task_cnt; + seg = TASK_PRIORITY_SEG(t_loc); + if (t_loc == me->task_idx[seg]) { + me->task_idx[seg] = APR_RING_NEXT(t_loc, link); + if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks, + apr_thread_pool_task, + link) + || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) { + me->task_idx[seg] = NULL; + } + } + APR_RING_REMOVE(t_loc, link); + } + t_loc = next; + } + return APR_SUCCESS; +} + +static void wait_on_busy_threads(apr_thread_pool_t * me, void *owner) +{ + struct apr_thread_list_elt *elt; + apr_thread_mutex_lock(me->lock); + elt = APR_RING_FIRST(me->busy_thds); + while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) { +#ifndef NDEBUG + /* make sure the thread is not the one calling tasks_cancel */ + apr_os_thread_t *os_thread; + apr_os_thread_get(&os_thread, elt->thd); +#ifdef WIN32 + /* hack for apr win32 bug */ + assert(!apr_os_thread_equal(apr_os_thread_current(), os_thread)); +#else + assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread)); +#endif +#endif + if (elt->current_owner != owner) { + elt = APR_RING_NEXT(elt, link); + continue; + } + while (elt->current_owner == owner) { + apr_thread_mutex_unlock(me->lock); + apr_sleep(200 * 1000); + apr_thread_mutex_lock(me->lock); + } + elt = APR_RING_FIRST(me->busy_thds); + } + apr_thread_mutex_unlock(me->lock); + return; +} + +APR_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t * me, + void *owner) +{ + apr_status_t rv = APR_SUCCESS; + + apr_thread_mutex_lock(me->lock); + if (me->task_cnt > 0) { + rv = remove_tasks(me, owner); + } + if (me->scheduled_task_cnt > 0) { + rv = remove_scheduled_tasks(me, owner); + } + apr_thread_mutex_unlock(me->lock); + wait_on_busy_threads(me, owner); + + return rv; +} + +APR_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t * me) +{ + return me->task_cnt; +} + +APR_DECLARE(apr_size_t) + apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t * me) +{ + return me->scheduled_task_cnt; +} + +APR_DECLARE(apr_size_t) apr_thread_pool_threads_count(apr_thread_pool_t * me) +{ + return me->thd_cnt; +} + +APR_DECLARE(apr_size_t) apr_thread_pool_busy_count(apr_thread_pool_t * me) +{ + return me->thd_cnt - me->idle_cnt; +} + +APR_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t * me) +{ + return me->idle_cnt; +} + +APR_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t * me) +{ + return me->idle_max; +} + +/* + * This function stop extra idle threads to the cnt. + * @return the number of threads stopped + * NOTE: There could be busy threads become idle during this function + */ +static struct apr_thread_list_elt *trim_threads(apr_thread_pool_t * me, + apr_size_t * cnt, int idle) +{ + struct apr_thread_list *thds; + apr_size_t n, n_dbg, i; + struct apr_thread_list_elt *head, *tail, *elt; + + apr_thread_mutex_lock(me->lock); + if (idle) { + thds = me->idle_thds; + n = me->idle_cnt; + } + else { + thds = me->busy_thds; + n = me->thd_cnt - me->idle_cnt; + } + if (n <= *cnt) { + apr_thread_mutex_unlock(me->lock); + *cnt = 0; + return NULL; + } + n -= *cnt; + + head = APR_RING_FIRST(thds); + for (i = 0; i < *cnt; i++) { + head = APR_RING_NEXT(head, link); + } + tail = APR_RING_LAST(thds); + APR_RING_UNSPLICE(head, tail, link); + if (idle) { + me->idle_cnt = *cnt; + } + apr_thread_mutex_unlock(me->lock); + + n_dbg = 0; + for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) { + elt->stop = 1; + n_dbg++; + } + elt->stop = 1; + n_dbg++; + assert(n == n_dbg); + *cnt = n; + + APR_RING_PREV(head, link) = NULL; + APR_RING_NEXT(tail, link) = NULL; + return head; +} + +static apr_size_t trim_idle_threads(apr_thread_pool_t * me, apr_size_t cnt) +{ + apr_size_t n_dbg; + struct apr_thread_list_elt *elt; + apr_status_t rv; + + elt = trim_threads(me, &cnt, 1); + + apr_thread_mutex_lock(me->cond_lock); + apr_thread_cond_broadcast(me->cond); + apr_thread_mutex_unlock(me->cond_lock); + + n_dbg = 0; + while (elt) { + apr_thread_join(&rv, elt->thd); + elt = APR_RING_NEXT(elt, link); + ++n_dbg; + } + assert(cnt == n_dbg); + + return cnt; +} + +/* don't join on busy threads for performance reasons, who knows how long will + * the task takes to perform + */ +static apr_size_t trim_busy_threads(apr_thread_pool_t * me, apr_size_t cnt) +{ + trim_threads(me, &cnt, 0); + return cnt; +} + +APR_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t * me, + apr_size_t cnt) +{ + me->idle_max = cnt; + cnt = trim_idle_threads(me, cnt); + return cnt; +} + +APR_DECLARE(apr_size_t) apr_thread_pool_thread_max_get(apr_thread_pool_t * me) +{ + return me->thd_max; +} + +/* + * This function stop extra working threads to the new limit. + * NOTE: There could be busy threads become idle during this function + */ +APR_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t * me, + apr_size_t cnt) +{ + unsigned int n; + + me->thd_max = cnt; + if (0 == cnt || me->thd_cnt <= cnt) { + return 0; + } + + n = me->thd_cnt - cnt; + if (n >= me->idle_cnt) { + trim_busy_threads(me, n - me->idle_cnt); + trim_idle_threads(me, 0); + } + else { + trim_idle_threads(me, me->idle_cnt - n); + } + return n; +} + +APR_DECLARE(apr_size_t) apr_thread_pool_threshold_get(apr_thread_pool_t * me) +{ + return me->threshold; +} + +APR_DECLARE(apr_size_t) apr_thread_pool_threshold_set(apr_thread_pool_t * me, + apr_size_t val) +{ + apr_size_t ov; + + ov = me->threshold; + me->threshold = val; + return ov; +} + +#endif /* APR_HAS_THREADS */ + +/* vim: set ts=4 sw=4 et cin tw=80: */ --Boundary_(ID_0U6LhK63oMjE8ZDD9rRhYA)--