apr-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "William A. Rowe, Jr." <wr...@rowe-clan.net>
Subject Re: [PATCH] apr_threadpool
Date Fri, 08 Sep 2006 05:25:12 GMT
I'm +1 on adopting this in trunk so that the developers can spend some
time with the API, and we can begin duking out the details.

I suspect it could use a bit of tweaking, but it's easier to do that in
svn - the email thread has already yielded a cleaner API.

Henry Jen wrote:
> Hi,
> 
> Attached please find the latest patch to support thread_pool(the last
> one had a bug and cannot be merged cleanly), which has two enhancement
> from earlier patch:
> 
> 1. Ownership support: Now when submit a task to the thread pool, an
> owner identity can be specified. Which can be used to remove all tasks
> belongs to that owner afterwards.
> 
> 2. Timer: It is possible to schedule a task to be executed after a
> certain time interval. By doing another schedule within a task, that is
> basically a timer. Noted that scheduled tasks are with highest priority,
> and due tasks are always picked up first.
> 
> Comments are always welcome, and hopefully soon this can be committed.
> 
> Cheers,
> Henry
> 
> 
> 
> ------------------------------------------------------------------------
> 
> Index: aprutil.dsp
> ===================================================================
> --- aprutil.dsp	(revision 433720)
> +++ aprutil.dsp	(working copy)
> @@ -240,6 +240,10 @@
>  # PROP Default_Filter ""
>  # Begin Source File
>  
> +SOURCE=.\misc\apr_thread_pool.c
> +# End Source File
> +# Begin Source File
> +
>  SOURCE=.\misc\apr_date.c
>  # End Source File
>  # Begin Source File
> @@ -512,6 +516,10 @@
>  # End Source File
>  # Begin Source File
>  
> +SOURCE=.\include\apr_thread_pool.h
> +# End Source File
> +# Begin Source File
> +
>  SOURCE=.\include\apr_date.h
>  # End Source File
>  # Begin Source File
> Index: include/apr_thread_pool.h
> ===================================================================
> --- include/apr_thread_pool.h	(revision 0)
> +++ include/apr_thread_pool.h	(revision 0)
> @@ -0,0 +1,237 @@
> +/* Copyright 2006 Sun Microsystems, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#ifndef APR_THREAD_POOL_H
> +#define APR_THREAD_POOL_H
> +
> +#include "apr.h"
> +#include "apr_thread_proc.h"
> +
> +/**
> + * @file apr_thread_pool.h
> + * @brief APR Thread Pool Library
> +
> + * @remarks This library implements a thread pool using apr_thread_t. A thread
> + * pool is a set of threads that can be created in advance or on demand until a
> + * maximum number. When a task is scheduled, the thread pool will find an idle
> + * thread to handle the task. In case all existing threads are busy and the
> + * number of tasks in the queue is higher than the adjustable threshold, the
> + * pool will try to create a new thread to serve the task if the maximum number
> + * has not been reached. Otherwise, the task will be put into a queue based on
> + * priority, which can be valued from 0 to 255, with higher value been served
> + * first. In case there are tasks with the same priority, the new task is put at
> + * the top or the bottom depeneds on which function is used to put the task.
> + *
> + * @remarks There may be the case that a thread pool can use up the maximum
> + * number of threads at peak load, but having those threads idle afterwards. A
> + * maximum number of idle threads can be set so that extra idling threads will
> + * be terminated to save system resrouces. 
> + */
> +#if APR_HAS_THREADS
> +
> +#ifdef __cplusplus
> +extern "C"
> +{
> +#if 0
> +};
> +#endif
> +#endif /* __cplusplus */
> +
> +/** Opaque Thread Pool structure. */
> +typedef struct apr_thread_pool apr_thread_pool_t;
> +
> +#define APR_THREAD_TASK_PRIORITY_LOWEST 0
> +#define APR_THREAD_TASK_PRIORITY_LOW 63
> +#define APR_THREAD_TASK_PRIORITY_NORMAL 127
> +#define APR_THREAD_TASK_PRIORITY_HIGH 191
> +#define APR_THREAD_TASK_PRIORITY_HIGHEST 255
> +
> +/**
> + * Create a thread pool
> + * @param me A pointer points to the pointer receives the created
> + * apr_thread_pool object. The returned value will be NULL if failed to create
> + * the thread pool.
> + * @param init_threads The number of threads to be created initially, the number
> + * will also be used as the initial value for maximum number of idle threads. 
> + * @param max_threads The maximum number of threads that can be created
> + * @param pool The pool to use
> + * @return APR_SUCCESS if the thread pool was created successfully. Otherwise,
> + * the error code.
> + */
> +APR_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me,
> +                                                 apr_size_t init_threads,
> +                                                 apr_size_t max_threads,
> +                                                 apr_pool_t * pool);
> +
> +/**
> + * Destroy the thread pool and stop all the threads
> + * @return APR_SUCCESS if all threads are stopped.
> + */
> +APR_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me);
> +
> +/**
> + * Schedule a task to the bottom of the tasks of same priority.
> + * @param me The thread pool
> + * @param func The task function
> + * @param param The parameter for the task function
> + * @param priority The priority of the task.
> + * @param owner Owner of this task.
> + * @return APR_SUCCESS if the task had been scheduled successfully
> + */
> +APR_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t * me,
> +                                               apr_thread_start_t func,
> +                                               void *param,
> +                                               apr_byte_t priority,
> +                                               void *owner);
> +/**
> + * Schedule a task to be run after a delay
> + * @param me The thread pool
> + * @param func The task function
> + * @param param The parameter for the task function
> + * @param time Time in microseconds
> + * @param owner Owner of this task.
> + * @return APR_SUCCESS if the task had been scheduled successfully
> + */
> +APR_DECLARE(apr_status_t) apr_thread_pool_schedule(apr_thread_pool_t * me,
> +                                                   apr_thread_start_t func,
> +                                                   void *param,
> +                                                   apr_interval_time_t time,
> +                                                   void *owner);
> +
> +/**
> + * Schedule a task to the top of the tasks of same priority.
> + * @param me The thread pool
> + * @param func The task function
> + * @param param The parameter for the task function
> + * @param priority The priority of the task.
> + * @param owner Owner of this task.
> + * @return APR_SUCCESS if the task had been scheduled successfully
> + */
> +APR_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t * me,
> +                                              apr_thread_start_t func,
> +                                              void *param,
> +                                              apr_byte_t priority,
> +                                              void *owner);
> +
> +/**
> + * Cancel tasks submitted by the owner. If there is any task from the owner is
> + * currently under process, the function will spin until the task finished.
> + * @param me The thread pool
> + * @param owner Owner of the task
> + * @return APR_SUCCESS if the task has been cancelled successfully
> + * @note The task function should not be calling cancel, otherwise the function
> + * may get stuck forever. The function assert if it detect such a case.
> + */
> +APR_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t * me,
> +                                                       void *owner);
> +
> +/**
> + * Get current number of tasks waiting in the queue
> + * @param me The thread pool
> + * @return Number of tasks in the queue
> + */
> +APR_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t * me);
> +
> +/**
> + * Get current number of scheduled tasks waiting in the queue
> + * @param me The thread pool
> + * @return Number of scheduled tasks in the queue
> + */
> +APR_DECLARE(apr_size_t)
> +    apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t * me);
> +
> +/**
> + * Get current number of threads
> + * @param me The thread pool
> + * @return Number of total threads
> + */
> +APR_DECLARE(apr_size_t) apr_thread_pool_threads_count(apr_thread_pool_t * me);
> +
> +/**
> + * Get current number of busy threads
> + * @param me The thread pool
> + * @return Number of busy threads
> + */
> +APR_DECLARE(apr_size_t) apr_thread_pool_busy_count(apr_thread_pool_t * me);
> +
> +/**
> + * Get current number of idling thread
> + * @param me The thread pool
> + * @return Number of idling threads
> + */
> +APR_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t * me);
> +
> +/**
> + * Access function for the maximum number of idling thread. Number of current
> + * idle threads will be reduced to the new limit.
> + * @param me The thread pool
> + * @param cnt The number
> + * @return The number of threads were stopped.
> + */
> +APR_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t * me,
> +                                                     apr_size_t cnt);
> +
> +/**
> + * Access function for the maximum number of idling thread
> + * @param me The thread pool
> + * @return The current maximum number
> + */
> +APR_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t * me);
> +
> +/**
> + * Access function for the maximum number of thread. 
> + * @param me The thread pool
> + * @param cnt The number
> + * @return The original maximum number of threads
> + */
> +APR_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t * me,
> +                                                       apr_size_t cnt);
> +
> +/**
> + * Access function for the maximum number of threads
> + * @param me The thread pool
> + * @return The current maximum number
> + */
> +APR_DECLARE(apr_size_t) apr_thread_pool_thread_max_get(apr_thread_pool_t *
> +                                                       me);
> +
> +/**
> + * Access function for the threshold of tasks in queue to trigger a new thread. 
> + * @param me The thread pool
> + * @param cnt The new threshold
> + * @return The original threshold
> + */
> +APR_DECLARE(apr_size_t) apr_thread_pool_threshold_set(apr_thread_pool_t * me,
> +                                                      apr_size_t val);
> +
> +/**
> + * Access function for the threshold of tasks in queue to trigger a new thread. 
> + * @param me The thread pool
> + * @return The current threshold
> + */
> +APR_DECLARE(apr_size_t) apr_thread_pool_threshold_get(apr_thread_pool_t * me);
> +
> +#ifdef __cplusplus
> +#if 0
> +{
> +#endif
> +}
> +#endif
> +
> +#endif /* APR_HAS_THREADS */
> +
> +#endif /* APR_THREAD_POOL_H */
> +
> +/* vim: set ts=4 sw=4 et cin tw=80: */
> Index: misc/apr_thread_pool.c
> ===================================================================
> --- misc/apr_thread_pool.c	(revision 0)
> +++ misc/apr_thread_pool.c	(revision 0)
> @@ -0,0 +1,809 @@
> +/* Copyright 2006 Sun Microsystems, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#include <assert.h>
> +#include "apr_thread_pool.h"
> +#include "apr_ring.h"
> +#include "apr_thread_cond.h"
> +#include "apr_portable.h"
> +
> +#if APR_HAS_THREADS
> +
> +#define TASK_PRIORITY_SEGS 4
> +#define TASK_PRIORITY_SEG(x) (((x)->dispatch.priority & 0xFF) / 64)
> +
> +typedef struct apr_thread_pool_task
> +{
> +    APR_RING_ENTRY(apr_thread_pool_task) link;
> +    apr_thread_start_t func;
> +    void *param;
> +    void *owner;
> +    union
> +    {
> +        apr_byte_t priority;
> +        apr_time_t time;
> +    } dispatch;
> +} apr_thread_pool_task_t;
> +
> +APR_RING_HEAD(apr_thread_pool_tasks, apr_thread_pool_task);
> +
> +struct apr_thread_list_elt
> +{
> +    APR_RING_ENTRY(apr_thread_list_elt) link;
> +    apr_thread_t *thd;
> +    volatile void *current_owner;
> +    volatile int stop;
> +};
> +
> +APR_RING_HEAD(apr_thread_list, apr_thread_list_elt);
> +
> +struct apr_thread_pool
> +{
> +    apr_pool_t *pool;
> +    volatile apr_size_t thd_max;
> +    volatile apr_size_t idle_max;
> +    volatile apr_size_t thd_cnt;
> +    volatile apr_size_t idle_cnt;
> +    volatile apr_size_t task_cnt;
> +    volatile apr_size_t scheduled_task_cnt;
> +    volatile apr_size_t threshold;
> +    struct apr_thread_pool_tasks *tasks;
> +    struct apr_thread_pool_tasks *scheduled_tasks;
> +    struct apr_thread_list *busy_thds;
> +    struct apr_thread_list *idle_thds;
> +    apr_thread_mutex_t *lock;
> +    apr_thread_mutex_t *cond_lock;
> +    apr_thread_cond_t *cond;
> +    volatile int terminated;
> +    struct apr_thread_pool_tasks *recycled_tasks;
> +    apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS];
> +};
> +
> +static apr_status_t thread_pool_construct(apr_thread_pool_t * me,
> +                                          apr_size_t init_threads,
> +                                          apr_size_t max_threads)
> +{
> +    apr_status_t rv;
> +    int i;
> +
> +    me->thd_max = max_threads;
> +    me->idle_max = init_threads;
> +    me->threshold = init_threads / 2;
> +    rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED,
> +                                 me->pool);
> +    if (APR_SUCCESS != rv) {
> +        return rv;
> +    }
> +    rv = apr_thread_mutex_create(&me->cond_lock, APR_THREAD_MUTEX_UNNESTED,
> +                                 me->pool);
> +    if (APR_SUCCESS != rv) {
> +        apr_thread_mutex_destroy(me->lock);
> +        return rv;
> +    }
> +    rv = apr_thread_cond_create(&me->cond, me->pool);
> +    if (APR_SUCCESS != rv) {
> +        apr_thread_mutex_destroy(me->lock);
> +        apr_thread_mutex_destroy(me->cond_lock);
> +        return rv;
> +    }
> +    me->tasks = apr_palloc(me->pool, sizeof(*me->tasks));
> +    if (!me->tasks) {
> +        goto CATCH_ENOMEM;
> +    }
> +    APR_RING_INIT(me->tasks, apr_thread_pool_task, link);
> +    me->scheduled_tasks = apr_palloc(me->pool, sizeof(*me->scheduled_tasks));
> +    if (!me->scheduled_tasks) {
> +        goto CATCH_ENOMEM;
> +    }
> +    APR_RING_INIT(me->scheduled_tasks, apr_thread_pool_task, link);
> +    me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks));
> +    if (!me->recycled_tasks) {
> +        goto CATCH_ENOMEM;
> +    }
> +    APR_RING_INIT(me->recycled_tasks, apr_thread_pool_task, link);
> +    me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds));
> +    if (!me->busy_thds) {
> +        goto CATCH_ENOMEM;
> +    }
> +    APR_RING_INIT(me->busy_thds, apr_thread_list_elt, link);
> +    me->idle_thds = apr_palloc(me->pool, sizeof(*me->idle_thds));
> +    if (!me->idle_thds) {
> +        goto CATCH_ENOMEM;
> +    }
> +    APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link);
> +    me->thd_cnt = me->idle_cnt = me->task_cnt = me->scheduled_task_cnt =
0;
> +    me->terminated = 0;
> +    for (i = 0; i < TASK_PRIORITY_SEGS; i++) {
> +        me->task_idx[i] = NULL;
> +    }
> +    goto FINAL_EXIT;
> +  CATCH_ENOMEM:
> +    rv = APR_ENOMEM;
> +    apr_thread_mutex_destroy(me->lock);
> +    apr_thread_mutex_destroy(me->cond_lock);
> +    apr_thread_cond_destroy(me->cond);
> +  FINAL_EXIT:
> +    return rv;
> +}
> +
> +/*
> + * NOTE: This function is not thread safe by itself. Caller should hold the lock
> + */
> +static apr_thread_pool_task_t *pop_task(apr_thread_pool_t * me)
> +{
> +    apr_thread_pool_task_t *task = NULL;
> +    int seg;
> +
> +    /* check for scheduled tasks */
> +    if (me->scheduled_task_cnt > 0) {
> +        task = APR_RING_FIRST(me->scheduled_tasks);
> +        assert(task != NULL);
> +        assert(task !=
> +               APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
> +                                 link));
> +        /* if it's time */
> +        if (task->dispatch.time <= apr_time_now()) {
> +            --me->scheduled_task_cnt;
> +            APR_RING_REMOVE(task, link);
> +            return task;
> +        }
> +    }
> +    /* check for normal tasks if we're not returning a scheduled task */
> +    if (me->task_cnt == 0) {
> +        return NULL;
> +    }
> +
> +    task = APR_RING_FIRST(me->tasks);
> +    assert(task != NULL);
> +    assert(task != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link));
> +    --me->task_cnt;
> +    seg = TASK_PRIORITY_SEG(task);
> +    if (task == me->task_idx[seg]) {
> +        me->task_idx[seg] = APR_RING_NEXT(task, link);
> +        if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
> +                                                   apr_thread_pool_task, link)
> +            || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
> +            me->task_idx[seg] = NULL;
> +        }
> +    }
> +    APR_RING_REMOVE(task, link);
> +    return task;
> +}
> +
> +static apr_interval_time_t waiting_time(apr_thread_pool_t * me)
> +{
> +    apr_thread_pool_task_t *task = NULL;
> +
> +    task = APR_RING_FIRST(me->scheduled_tasks);
> +    assert(task != NULL);
> +    assert(task !=
> +           APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
> +                             link));
> +    return task->dispatch.time - apr_time_now();
> +}
> +
> +/*
> + * The worker thread function. Take a task from the queue and perform it if
> + * there is any. Otherwise, put itself into the idle thread list and waiting
> + * for signal to wake up.
> + * The thread terminate directly by detach and exit when it is asked to stop
> + * after finishing a task. Otherwise, the thread should be in idle thread list
> + * and should be joined.
> + */
> +static void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param)
> +{
> +    apr_status_t rv = APR_SUCCESS;
> +    apr_thread_pool_t *me = param;
> +    apr_thread_pool_task_t *task = NULL;
> +    apr_interval_time_t wait;
> +    struct apr_thread_list_elt *elt;
> +
> +    elt = apr_pcalloc(me->pool, sizeof(*elt));
> +    if (!elt) {
> +        apr_thread_exit(t, APR_ENOMEM);
> +    }
> +    APR_RING_ELEM_INIT(elt, link);
> +    elt->thd = t;
> +    elt->stop = 0;
> +
> +    apr_thread_mutex_lock(me->lock);
> +    while (!me->terminated && !elt->stop) {
> +        /* if not new element, it is awakened from idle */
> +        if (APR_RING_NEXT(elt, link) != elt) {
> +            --me->idle_cnt;
> +            APR_RING_REMOVE(elt, link);
> +        }
> +        APR_RING_INSERT_TAIL(me->busy_thds, elt, apr_thread_list_elt, link);
> +        task = pop_task(me);
> +        while (NULL != task && !me->terminated) {
> +            elt->current_owner = task->owner;
> +            apr_thread_mutex_unlock(me->lock);
> +            task->func(t, task->param);
> +            apr_thread_mutex_lock(me->lock);
> +            APR_RING_INSERT_TAIL(me->recycled_tasks, task,
> +                                 apr_thread_pool_task, link);
> +            elt->current_owner = NULL;
> +            if (elt->stop) {
> +                break;
> +            }
> +            task = pop_task(me);
> +        }
> +        assert(NULL == elt->current_owner);
> +        APR_RING_REMOVE(elt, link);
> +
> +        /* busy thread been asked to stop, not joinable */
> +        if ((me->idle_cnt >= me->idle_max
> +             && !(me->scheduled_task_cnt && 0 >= me->idle_max))
> +            || me->terminated || elt->stop) {
> +            --me->thd_cnt;
> +            apr_thread_mutex_unlock(me->lock);
> +            apr_thread_detach(t);
> +            apr_thread_exit(t, APR_SUCCESS);
> +            return NULL;        /* should not be here, safe net */
> +        }
> +
> +        ++me->idle_cnt;
> +        APR_RING_INSERT_TAIL(me->idle_thds, elt, apr_thread_list_elt, link);
> +        wait = (me->scheduled_task_cnt) ? waiting_time(me) : -1;
> +        apr_thread_mutex_unlock(me->lock);
> +        apr_thread_mutex_lock(me->cond_lock);
> +        if (wait >= 0) {
> +            rv = apr_thread_cond_timedwait(me->cond, me->cond_lock, wait);
> +        }
> +        else {
> +            rv = apr_thread_cond_wait(me->cond, me->cond_lock);
> +        }
> +        apr_thread_mutex_unlock(me->cond_lock);
> +        apr_thread_mutex_lock(me->lock);
> +    }
> +
> +    /* idle thread been asked to stop, will be joined */
> +    --me->thd_cnt;
> +    apr_thread_mutex_unlock(me->lock);
> +    apr_thread_exit(t, APR_SUCCESS);
> +    return NULL;                /* should not be here, safe net */
> +}
> +
> +static apr_status_t thread_pool_cleanup(void *me)
> +{
> +    apr_thread_pool_t *_self = me;
> +
> +    _self->terminated = 1;
> +    apr_thread_pool_idle_max_set(_self, 0);
> +    while (_self->thd_cnt) {
> +        apr_sleep(20 * 1000);   /* spin lock with 20 ms */
> +    }
> +    apr_thread_mutex_destroy(_self->lock);
> +    apr_thread_mutex_destroy(_self->cond_lock);
> +    apr_thread_cond_destroy(_self->cond);
> +    return APR_SUCCESS;
> +}
> +
> +APR_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me,
> +                                                 apr_size_t init_threads,
> +                                                 apr_size_t max_threads,
> +                                                 apr_pool_t * pool)
> +{
> +    apr_thread_t *t;
> +    apr_status_t rv = APR_SUCCESS;
> +
> +    *me = apr_pcalloc(pool, sizeof(**me));
> +    if (!*me) {
> +        return APR_ENOMEM;
> +    }
> +
> +    (*me)->pool = pool;
> +
> +    rv = thread_pool_construct(*me, init_threads, max_threads);
> +    if (APR_SUCCESS != rv) {
> +        *me = NULL;
> +        return rv;
> +    }
> +    apr_pool_cleanup_register(pool, *me, thread_pool_cleanup,
> +                              apr_pool_cleanup_null);
> +
> +    while (init_threads) {
> +        rv = apr_thread_create(&t, NULL, thread_pool_func, *me, (*me)->pool);
> +        if (APR_SUCCESS != rv) {
> +            break;
> +        }
> +        ++(*me)->thd_cnt;
> +        --init_threads;
> +    }
> +
> +    return rv;
> +}
> +
> +APR_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me)
> +{
> +    return apr_pool_cleanup_run(me->pool, me, thread_pool_cleanup);
> +}
> +
> +/*
> + * NOTE: This function is not thread safe by itself. Caller should hold the lock
> + */
> +static apr_thread_pool_task_t *task_new(apr_thread_pool_t * me,
> +                                        apr_thread_start_t func,
> +                                        void *param, apr_byte_t priority,
> +                                        void *owner, apr_time_t time)
> +{
> +    apr_thread_pool_task_t *t;
> +
> +    if (APR_RING_EMPTY(me->recycled_tasks, apr_thread_pool_task, link)) {
> +        t = apr_pcalloc(me->pool, sizeof(*t));
> +        if (NULL == t) {
> +            return NULL;
> +        }
> +    }
> +    else {
> +        t = APR_RING_FIRST(me->recycled_tasks);
> +        APR_RING_REMOVE(t, link);
> +    }
> +
> +    APR_RING_ELEM_INIT(t, link);
> +    t->func = func;
> +    t->param = param;
> +    t->owner = owner;
> +    if (time > 0) {
> +        t->dispatch.time = apr_time_now() + time;
> +    }
> +    else {
> +        t->dispatch.priority = priority;
> +    }
> +    return t;
> +}
> +
> +/*
> + * Test it the task is the only one within the priority segment. 
> + * If it is not, return the first element with same or lower priority. 
> + * Otherwise, add the task into the queue and return NULL.
> + *
> + * NOTE: This function is not thread safe by itself. Caller should hold the lock
> + */
> +static apr_thread_pool_task_t *add_if_empty(apr_thread_pool_t * me,
> +                                            apr_thread_pool_task_t * const t)
> +{
> +    int seg;
> +    int next;
> +    apr_thread_pool_task_t *t_next;
> +
> +    seg = TASK_PRIORITY_SEG(t);
> +    if (me->task_idx[seg]) {
> +        assert(APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
> +               me->task_idx[seg]);
> +        t_next = me->task_idx[seg];
> +        while (t_next->dispatch.priority > t->dispatch.priority) {
> +            t_next = APR_RING_NEXT(t_next, link);
> +            if (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) ==
> +                t_next) {
> +                return t_next;
> +            }
> +        }
> +        return t_next;
> +    }
> +
> +    for (next = seg - 1; next >= 0; next--) {
> +        if (me->task_idx[next]) {
> +            APR_RING_INSERT_BEFORE(me->task_idx[next], t, link);
> +            break;
> +        }
> +    }
> +    if (0 > next) {
> +        APR_RING_INSERT_TAIL(me->tasks, t, apr_thread_pool_task, link);
> +    }
> +    me->task_idx[seg] = t;
> +    return NULL;
> +}
> +
> +/*
> +*   schedule a task to run in "time" milliseconds. Find the spot in the ring where
> +*   the time fits. Adjust the short_time so the thread wakes up when the time is reached.
> +*/
> +static apr_status_t schedule_task(apr_thread_pool_t * me,
> +                                  apr_thread_start_t func, void *param,
> +                                  void *owner, apr_interval_time_t time)
> +{
> +    apr_thread_pool_task_t *t;
> +    apr_thread_pool_task_t *t_loc;
> +    apr_thread_t *thd;
> +    apr_status_t rv = APR_SUCCESS;
> +    apr_thread_mutex_lock(me->lock);
> +
> +    t = task_new(me, func, param, 0, owner, time);
> +    if (NULL == t) {
> +        apr_thread_mutex_unlock(me->lock);
> +        return APR_ENOMEM;
> +    }
> +    t_loc = APR_RING_FIRST(me->scheduled_tasks);
> +    while (NULL != t_loc) {
> +        /* if the time is less than the entry insert ahead of it */
> +        if (t->dispatch.time < t_loc->dispatch.time) {
> +            ++me->scheduled_task_cnt;
> +            APR_RING_INSERT_BEFORE(t_loc, t, link);
> +            break;
> +        }
> +        else {
> +            t_loc = APR_RING_NEXT(t_loc, link);
> +            if (t_loc ==
> +                APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
> +                                  link)) {
> +                ++me->scheduled_task_cnt;
> +                APR_RING_INSERT_TAIL(me->scheduled_tasks, t,
> +                                     apr_thread_pool_task, link);
> +                break;
> +            }
> +        }
> +    }
> +    /* there should be at least one thread for scheduled tasks */
> +    if (0 == me->thd_cnt) {
> +        rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
> +        if (APR_SUCCESS == rv) {
> +            ++me->thd_cnt;
> +        }
> +    }
> +    apr_thread_mutex_unlock(me->lock);
> +    apr_thread_mutex_lock(me->cond_lock);
> +    apr_thread_cond_signal(me->cond);
> +    apr_thread_mutex_unlock(me->cond_lock);
> +    return rv;
> +}
> +
> +static apr_status_t add_task(apr_thread_pool_t * me, apr_thread_start_t func,
> +                             void *param, apr_byte_t priority, int push,
> +                             void *owner)
> +{
> +    apr_thread_pool_task_t *t;
> +    apr_thread_pool_task_t *t_loc;
> +    apr_thread_t *thd;
> +    apr_status_t rv = APR_SUCCESS;
> +
> +    apr_thread_mutex_lock(me->lock);
> +
> +    t = task_new(me, func, param, priority, owner, 0);
> +    if (NULL == t) {
> +        apr_thread_mutex_unlock(me->lock);
> +        return APR_ENOMEM;
> +    }
> +
> +    t_loc = add_if_empty(me, t);
> +    if (NULL == t_loc) {
> +        goto FINAL_EXIT;
> +    }
> +
> +    if (push) {
> +        while (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
> +               t_loc && t_loc->dispatch.priority >= t->dispatch.priority)
{
> +            t_loc = APR_RING_NEXT(t_loc, link);
> +        }
> +    }
> +    APR_RING_INSERT_BEFORE(t_loc, t, link);
> +    if (!push) {
> +        if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) {
> +            me->task_idx[TASK_PRIORITY_SEG(t)] = t;
> +        }
> +    }
> +
> +  FINAL_EXIT:
> +    me->task_cnt++;
> +    if (0 == me->thd_cnt || (0 == me->idle_cnt && me->thd_cnt <
me->thd_max &&
> +                             me->task_cnt > me->threshold)) {
> +        rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
> +        if (APR_SUCCESS == rv) {
> +            ++me->thd_cnt;
> +        }
> +    }
> +    apr_thread_mutex_unlock(me->lock);
> +
> +    apr_thread_mutex_lock(me->cond_lock);
> +    apr_thread_cond_signal(me->cond);
> +    apr_thread_mutex_unlock(me->cond_lock);
> +
> +    return rv;
> +}
> +
> +APR_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t * me,
> +                                               apr_thread_start_t func,
> +                                               void *param,
> +                                               apr_byte_t priority,
> +                                               void *owner)
> +{
> +    return add_task(me, func, param, priority, 1, owner);
> +}
> +
> +APR_DECLARE(apr_status_t) apr_thread_pool_schedule(apr_thread_pool_t * me,
> +                                                   apr_thread_start_t func,
> +                                                   void *param,
> +                                                   apr_interval_time_t time,
> +                                                   void *owner)
> +{
> +    return schedule_task(me, func, param, owner, time);
> +}
> +
> +APR_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t * me,
> +                                              apr_thread_start_t func,
> +                                              void *param,
> +                                              apr_byte_t priority,
> +                                              void *owner)
> +{
> +    return add_task(me, func, param, priority, 0, owner);
> +}
> +
> +static apr_status_t remove_scheduled_tasks(apr_thread_pool_t * me,
> +                                           void *owner)
> +{
> +    apr_thread_pool_task_t *t_loc;
> +    apr_thread_pool_task_t *next;
> +
> +    t_loc = APR_RING_FIRST(me->scheduled_tasks);
> +    while (t_loc !=
> +           APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
> +                             link)) {
> +        next = APR_RING_NEXT(t_loc, link);
> +        /* if this is the owner remove it */
> +        if (t_loc->owner == owner) {
> +            --me->scheduled_task_cnt;
> +            APR_RING_REMOVE(t_loc, link);
> +        }
> +        t_loc = next;
> +    }
> +    return APR_SUCCESS;
> +}
> +
> +static apr_status_t remove_tasks(apr_thread_pool_t * me, void *owner)
> +{
> +    apr_thread_pool_task_t *t_loc;
> +    apr_thread_pool_task_t *next;
> +    int seg;
> +
> +    t_loc = APR_RING_FIRST(me->tasks);
> +    while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) {
> +        next = APR_RING_NEXT(t_loc, link);
> +        if (t_loc->owner == owner) {
> +            --me->task_cnt;
> +            seg = TASK_PRIORITY_SEG(t_loc);
> +            if (t_loc == me->task_idx[seg]) {
> +                me->task_idx[seg] = APR_RING_NEXT(t_loc, link);
> +                if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
> +                                                           apr_thread_pool_task,
> +                                                           link)
> +                    || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
> +                    me->task_idx[seg] = NULL;
> +                }
> +            }
> +            APR_RING_REMOVE(t_loc, link);
> +        }
> +        t_loc = next;
> +    }
> +    return APR_SUCCESS;
> +}
> +
> +static void wait_on_busy_threads(apr_thread_pool_t * me, void *owner)
> +{
> +    struct apr_thread_list_elt *elt;
> +    apr_thread_mutex_lock(me->lock);
> +    elt = APR_RING_FIRST(me->busy_thds);
> +    while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) {
> +#ifndef NDEBUG
> +        /* make sure the thread is not the one calling tasks_cancel */
> +        apr_os_thread_t *os_thread;
> +        apr_os_thread_get(&os_thread, elt->thd);
> +        assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread));
> +#endif
> +        if (elt->current_owner != owner) {
> +            elt = APR_RING_NEXT(elt, link);
> +            continue;
> +        }
> +        while (elt->current_owner == owner) {
> +            apr_thread_mutex_unlock(me->lock);
> +            apr_sleep(200 * 1000);
> +            apr_thread_mutex_lock(me->lock);
> +        }
> +        elt = APR_RING_FIRST(me->busy_thds);
> +    }
> +    apr_thread_mutex_unlock(me->lock);
> +    return;
> +}
> +
> +APR_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t * me,
> +                                                       void *owner)
> +{
> +    apr_status_t rv = APR_SUCCESS;
> +
> +    apr_thread_mutex_lock(me->lock);
> +    if (me->task_cnt > 0) {
> +        rv = remove_tasks(me, owner);
> +    }
> +    if (me->scheduled_task_cnt > 0) {
> +        rv = remove_scheduled_tasks(me, owner);
> +    }
> +    apr_thread_mutex_unlock(me->lock);
> +    wait_on_busy_threads(me, owner);
> +
> +    return rv;
> +}
> +
> +APR_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t * me)
> +{
> +    return me->task_cnt;
> +}
> +
> +APR_DECLARE(apr_size_t)
> +    apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t * me)
> +{
> +    return me->scheduled_task_cnt;
> +}
> +
> +APR_DECLARE(apr_size_t) apr_thread_pool_threads_count(apr_thread_pool_t * me)
> +{
> +    return me->thd_cnt;
> +}
> +
> +APR_DECLARE(apr_size_t) apr_thread_pool_busy_count(apr_thread_pool_t * me)
> +{
> +    return me->thd_cnt - me->idle_cnt;
> +}
> +
> +APR_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t * me)
> +{
> +    return me->idle_cnt;
> +}
> +
> +APR_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t * me)
> +{
> +    return me->idle_max;
> +}
> +
> +/*
> + * This function stop extra idle threads to the cnt.
> + * @return the number of threads stopped
> + * NOTE: There could be busy threads become idle during this function
> + */
> +static struct apr_thread_list_elt *trim_threads(apr_thread_pool_t * me,
> +                                                apr_size_t * cnt, int idle)
> +{
> +    struct apr_thread_list *thds;
> +    apr_size_t n, n_dbg, i;
> +    struct apr_thread_list_elt *head, *tail, *elt;
> +
> +    apr_thread_mutex_lock(me->lock);
> +    if (idle) {
> +        thds = me->idle_thds;
> +        n = me->idle_cnt;
> +    }
> +    else {
> +        thds = me->busy_thds;
> +        n = me->thd_cnt - me->idle_cnt;
> +    }
> +    if (n <= *cnt) {
> +        apr_thread_mutex_unlock(me->lock);
> +        *cnt = 0;
> +        return NULL;
> +    }
> +    n -= *cnt;
> +
> +    head = APR_RING_FIRST(thds);
> +    for (i = 0; i < *cnt; i++) {
> +        head = APR_RING_NEXT(head, link);
> +    }
> +    tail = APR_RING_LAST(thds);
> +    APR_RING_UNSPLICE(head, tail, link);
> +    if (idle) {
> +        me->idle_cnt = *cnt;
> +    }
> +    apr_thread_mutex_unlock(me->lock);
> +
> +    n_dbg = 0;
> +    for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) {
> +        elt->stop = 1;
> +        n_dbg++;
> +    }
> +    elt->stop = 1;
> +    n_dbg++;
> +    assert(n == n_dbg);
> +    *cnt = n;
> +
> +    APR_RING_PREV(head, link) = NULL;
> +    APR_RING_NEXT(tail, link) = NULL;
> +    return head;
> +}
> +
> +static apr_size_t trim_idle_threads(apr_thread_pool_t * me, apr_size_t cnt)
> +{
> +    apr_size_t n_dbg;
> +    struct apr_thread_list_elt *elt;
> +    apr_status_t rv;
> +
> +    elt = trim_threads(me, &cnt, 1);
> +
> +    apr_thread_mutex_lock(me->cond_lock);
> +    apr_thread_cond_broadcast(me->cond);
> +    apr_thread_mutex_unlock(me->cond_lock);
> +
> +    n_dbg = 0;
> +    while (elt) {
> +        apr_thread_join(&rv, elt->thd);
> +        elt = APR_RING_NEXT(elt, link);
> +        ++n_dbg;
> +    }
> +    assert(cnt == n_dbg);
> +
> +    return cnt;
> +}
> +
> +/* don't join on busy threads for performance reasons, who knows how long will
> + * the task takes to perform
> + */
> +static apr_size_t trim_busy_threads(apr_thread_pool_t * me, apr_size_t cnt)
> +{
> +    trim_threads(me, &cnt, 0);
> +    return cnt;
> +}
> +
> +APR_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t * me,
> +                                                     apr_size_t cnt)
> +{
> +    me->idle_max = cnt;
> +    cnt = trim_idle_threads(me, cnt);
> +    return cnt;
> +}
> +
> +APR_DECLARE(apr_size_t) apr_thread_pool_thread_max_get(apr_thread_pool_t * me)
> +{
> +    return me->thd_max;
> +}
> +
> +/*
> + * This function stop extra working threads to the new limit.
> + * NOTE: There could be busy threads become idle during this function
> + */
> +APR_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t * me,
> +                                                       apr_size_t cnt)
> +{
> +    unsigned int n;
> +
> +    me->thd_max = cnt;
> +    if (0 == cnt || me->thd_cnt <= cnt) {
> +        return 0;
> +    }
> +
> +    n = me->thd_cnt - cnt;
> +    if (n >= me->idle_cnt) {
> +        trim_busy_threads(me, n - me->idle_cnt);
> +        trim_idle_threads(me, 0);
> +    }
> +    else {
> +        trim_idle_threads(me, me->idle_cnt - n);
> +    }
> +    return n;
> +}
> +
> +APR_DECLARE(apr_size_t) apr_thread_pool_threshold_get(apr_thread_pool_t * me)
> +{
> +    return me->threshold;
> +}
> +
> +APR_DECLARE(apr_size_t) apr_thread_pool_threshold_set(apr_thread_pool_t * me,
> +                                                      apr_size_t val)
> +{
> +    apr_size_t ov;
> +
> +    ov = me->threshold;
> +    me->threshold = val;
> +    return ov;
> +}
> +
> +#endif /* APR_HAS_THREADS */
> +
> +/* vim: set ts=4 sw=4 et cin tw=80: */


Mime
View raw message