httpd-cvs mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bri...@apache.org
Subject cvs commit: httpd-2.0/server/mpm/experimental/leader leader.c
Date Fri, 19 Apr 2002 06:33:08 GMT
brianp      02/04/18 23:33:08

  Modified:    server/mpm/experimental/leader leader.c
  Log:
  Replaced the mutex around the idle worker stack with
  atomic compare-and-swap loops
  
  Revision  Changes    Path
  1.10      +138 -97   httpd-2.0/server/mpm/experimental/leader/leader.c
  
  Index: leader.c
  ===================================================================
  RCS file: /home/cvs/httpd-2.0/server/mpm/experimental/leader/leader.c,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -r1.9 -r1.10
  --- leader.c	16 Apr 2002 02:04:14 -0000	1.9
  +++ leader.c	19 Apr 2002 06:33:08 -0000	1.10
  @@ -72,6 +72,7 @@
   #include "apr_thread_cond.h"
   #include "apr_thread_mutex.h"
   #include "apr_proc_mutex.h"
  +#include "apr_atomic.h"
   #define APR_WANT_STRFUNC
   #include "apr_want.h"
   
  @@ -172,6 +173,8 @@
   static int num_listensocks = 0;
   static int resource_shortage = 0;
   
  +typedef struct worker_wakeup_info worker_wakeup_info;
  +
   /* The structure used to pass unique initialization info to each thread */
   typedef struct {
       int pid;
  @@ -243,102 +246,147 @@
   
   /* Structure used to wake up an idle worker thread
    */
  -typedef struct {
  +struct worker_wakeup_info {
       apr_thread_cond_t *cond;
       apr_thread_mutex_t *mutex;
  -} worker_wakeup_info;
  +    apr_uint32_t next; /* index into worker_wakeups array,
  +                        * used to build a linked list
  +                        */
  +};
  +
  +static worker_wakeup_info *worker_wakeup_create(apr_pool_t *pool)
  +{
  +    apr_status_t rv;
  +    worker_wakeup_info *wakeup;
  +
  +    wakeup = (worker_wakeup_info *)apr_palloc(pool, sizeof(*wakeup));
  +    if ((rv = apr_thread_cond_create(&wakeup->cond, pool)) != APR_SUCCESS) {
  +        return NULL;
  +    }
  +    if ((rv = apr_thread_mutex_create(&wakeup->mutex, APR_THREAD_MUTEX_DEFAULT,
  +                                      pool)) != APR_SUCCESS) {
  +        return NULL;
  +    }
  +    /* The wakeup's mutex will be unlocked automatically when
  +     * the worker blocks on the condition variable
  +     */
  +    apr_thread_mutex_lock(wakeup->mutex);
  +    return wakeup;
  +}
  +
   
   /* Structure used to hold a stack of idle worker threads 
    */
   typedef struct {
  -    apr_thread_mutex_t *mutex;
  -    int no_listener;
  -    worker_wakeup_info **stack;
  -    apr_size_t nelts;
  -    apr_size_t nalloc;
  +    /* 'state' consists of several fields concatenated into a
  +     * single 32-bit int for use with the apr_atomic_cas() API:
  +     *   state & STACK_FIRST  is the thread ID of the first thread
  +     *                        in a linked list of idle threads
  +     *   state & STACK_TERMINATED  indicates whether the proc is shutting down
  +     *   state & STACK_NO_LISTENER indicates whether the process has
  +     *                             no current listener thread
  +     */
  +    apr_uint32_t state;
   } worker_stack;
   
  +#define STACK_FIRST  0xffff
  +#define STACK_LIST_END  0xffff
  +#define STACK_TERMINATED 0x10000
  +#define STACK_NO_LISTENER 0x20000
  +
  +static worker_wakeup_info **worker_wakeups = NULL;
  +
   static worker_stack* worker_stack_create(apr_pool_t *pool, apr_size_t max)
   {
  -    apr_status_t rv;
       worker_stack *stack = (worker_stack *)apr_palloc(pool, sizeof(*stack));
  -
  -    if ((rv = apr_thread_mutex_create(&stack->mutex, APR_THREAD_MUTEX_DEFAULT,
  -                                      pool)) != APR_SUCCESS) {
  -        return NULL;
  -    }
  -    stack->no_listener = 1;
  -    stack->nelts = 0;
  -    stack->nalloc = max;
  -    stack->stack =
  -        (worker_wakeup_info **)apr_palloc(pool, stack->nalloc *
  -                                          sizeof(worker_wakeup_info *));
  +    stack->state = STACK_NO_LISTENER | STACK_LIST_END;
       return stack;
   }
   
   static apr_status_t worker_stack_wait(worker_stack *stack,
  -                                      worker_wakeup_info *wakeup)
  +                                      apr_uint32_t worker_id)
   {
  -    apr_status_t rv;
  -    if ((rv = apr_thread_mutex_lock(stack->mutex)) != APR_SUCCESS) {
  -        return rv;
  -    }
  -    if (stack->no_listener) {
  -        /* this thread should become the new listener immediately */
  -        stack->no_listener = 0;
  -        if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
  -            return rv;
  -        }
  -        return APR_SUCCESS;
  -    }
  -    else {
  -        /* push this thread onto the stack of idle workers, and block
  -         * on the condition variable until awoken
  -         */
  -        if (stack->nelts == stack->nalloc) {
  -            return APR_ENOSPC;
  +    worker_wakeup_info *wakeup = worker_wakeups[worker_id];
  +
  +    while (1) {
  +        apr_uint32_t state = stack->state;
  +        if (state & (STACK_TERMINATED | STACK_NO_LISTENER)) {
  +            if (state & STACK_TERMINATED) {
  +                return APR_EINVAL;
  +            }
  +            if (apr_atomic_cas(&(stack->state), STACK_LIST_END, state) !=
  +                state) {
  +                continue;
  +            }
  +            else {
  +                return APR_SUCCESS;
  +            }
           }
  -        stack->stack[stack->nelts++] = wakeup;
  -        if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
  -            return rv;
  +        wakeup->next = state;
  +        if (apr_atomic_cas(&(stack->state), worker_id, state) != state) {
  +            continue;
           }
  -        if ((rv = apr_thread_cond_wait(wakeup->cond, wakeup->mutex)) !=
  -            APR_SUCCESS) {
  -            return rv;
  +        else {
  +            return apr_thread_cond_wait(wakeup->cond, wakeup->mutex);
           }
  -        return APR_SUCCESS;
  -    }
  +    }    
   }
   
   static apr_status_t worker_stack_awaken_next(worker_stack *stack)
   {
  -    apr_status_t rv;
  -    if ((rv = apr_thread_mutex_lock(stack->mutex)) != APR_SUCCESS) {
  -        return rv;
  -    }
  -    if (stack->nelts) {
  -        worker_wakeup_info *wakeup = stack->stack[--stack->nelts];
  -        if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
  -            return rv;
  -        }
  -        /* Acquire and release the idle worker's mutex to ensure
  -         * that it's actually waiting on its condition variable
  -         */
  -        if ((rv = apr_thread_mutex_lock(wakeup->mutex)) != APR_SUCCESS) {
  -            return rv;
  +
  +    while (1) {
  +        apr_uint32_t state = stack->state;
  +        apr_uint32_t first = state & STACK_FIRST;
  +        if (first == STACK_LIST_END) {
  +            if (apr_atomic_cas(&(stack->state), state | STACK_NO_LISTENER,
  +                               state) != state) {
  +                continue;
  +            }
  +            else {
  +                return APR_SUCCESS;
  +            }
           }
  -        if ((rv = apr_thread_mutex_unlock(wakeup->mutex)) != APR_SUCCESS) {
  -            return rv;
  +        else {
  +            worker_wakeup_info *wakeup = worker_wakeups[first];
  +            apr_uint32_t new_state = state & ~STACK_FIRST;
  +            new_state |= wakeup->next;
  +            if (apr_atomic_cas(&(stack->state), new_state, state) != state) {
  +                continue;
  +            }
  +            else {
  +                /* Acquire and release the idle worker's mutex to ensure
  +                 * that it's actually waiting on its condition variable
  +                 */
  +                apr_status_t rv;
  +                if ((rv = apr_thread_mutex_lock(wakeup->mutex)) !=
  +                    APR_SUCCESS) {
  +                    return rv;
  +                }
  +                if ((rv = apr_thread_mutex_unlock(wakeup->mutex)) !=
  +                    APR_SUCCESS) {
  +                    return rv;
  +                }
  +                return apr_thread_cond_signal(wakeup->cond);
  +            }
           }
  -        apr_thread_mutex_unlock(wakeup->mutex);
  -        if ((rv = apr_thread_cond_signal(wakeup->cond)) != APR_SUCCESS) {
  -            apr_thread_mutex_unlock(stack->mutex);
  -            return rv;
  +    }
  +}
  +
  +static apr_status_t worker_stack_term(worker_stack *stack)
  +{
  +    int i;
  +    apr_status_t rv;
  +
  +    while (1) {
  +        apr_uint32_t state = stack->state;
  +        if (apr_atomic_cas(&(stack->state), state | STACK_TERMINATED,
  +                           state) == state) {
  +            break;
           }
       }
  -    else {
  -        stack->no_listener = 1;
  -        if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
  +    for (i = 0; i < ap_threads_per_child; i++) {
  +        if ((rv = worker_stack_awaken_next(stack)) != APR_SUCCESS) {
               return rv;
           }
       }
  @@ -355,16 +403,12 @@
   
   static void signal_threads(int mode)
   {
  -    int i;
       if (terminate_mode == mode) {
           return;
       }
       terminate_mode = mode;
   
  -    workers_may_exit = 1;
  -    for (i = 0; i < ap_threads_per_child; i++) {
  -        (void)worker_stack_awaken_next(idle_worker_stack);
  -    }
  +    worker_stack_term(idle_worker_stack);
   }
   
   AP_DECLARE(apr_status_t) ap_mpm_query(int query_code, int *result)
  @@ -726,6 +770,7 @@
       proc_info * ti = dummy;
       int process_slot = ti->pid;
       int thread_slot = ti->tid;
  +    apr_uint32_t my_worker_num = (apr_uint32_t)(ti->tid);
       apr_pool_t *tpool = apr_thread_pool_get(thd);
       void *csd = NULL;
       apr_allocator_t *allocator;
  @@ -735,7 +780,6 @@
       apr_pollfd_t *pollset;
       apr_status_t rv;
       ap_listen_rec *lr, *last_lr = ap_listeners;
  -    worker_wakeup_info *wakeup;
       int is_listener;
   
       ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_STARTING, NULL);
  @@ -747,24 +791,6 @@
       apr_allocator_set_owner(allocator, ptrans);
       bucket_alloc = apr_bucket_alloc_create(tpool);
   
  -    wakeup = (worker_wakeup_info *)apr_palloc(tpool, sizeof(*wakeup));
  -    if ((rv = apr_thread_cond_create(&wakeup->cond, tpool)) != APR_SUCCESS) {
  -        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
  -                     "apr_thread_cond_create failed. Attempting to shutdown "
  -                     "process gracefully.");
  -        signal_threads(ST_GRACEFUL);
  -        goto done;
  -    }
  -    if ((rv = apr_thread_mutex_create(&wakeup->mutex, APR_THREAD_MUTEX_DEFAULT,
  -                                      tpool)) != APR_SUCCESS) {
  -        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
  -                     "apr_thread_mutex_create failed. Attempting to shutdown "
  -                     "process gracefully.");
  -        signal_threads(ST_GRACEFUL);
  -        goto done;
  -    }
  -    apr_thread_mutex_lock(wakeup->mutex);
  -
       apr_poll_setup(&pollset, num_listensocks, tpool);
       for(lr = ap_listeners ; lr != NULL ; lr = lr->next)
           apr_poll_socket_add(pollset, lr->sd, APR_POLLIN);
  @@ -778,10 +804,12 @@
                                               SERVER_READY, NULL);
           if (!is_listener) {
               /* Wait until it's our turn to become the listener */
  -            if ((rv = worker_stack_wait(idle_worker_stack, wakeup)) !=
  +            if ((rv = worker_stack_wait(idle_worker_stack, my_worker_num)) !=
                   APR_SUCCESS) {
  -                ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
  -                             "worker_stack_wait failed. Shutting down");
  +                if (rv != APR_EINVAL) {
  +                    ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
  +                                 "worker_stack_wait failed. Shutting down");
  +                }
                   break;
               }
               if (workers_may_exit) {
  @@ -902,7 +930,8 @@
           }
       }
   
  - done:
  +    workers_may_exit = 1;
  +    worker_stack_term(idle_worker_stack);
       dying = 1;
       ap_scoreboard_image->parent[process_slot].quiescing = 1;
   
  @@ -951,15 +980,27 @@
           clean_child_exit(APEXIT_CHILDFATAL);
       }
   
  +    worker_wakeups = (worker_wakeup_info **)
  +        apr_palloc(pchild, sizeof(worker_wakeup_info *) *
  +                   ap_threads_per_child);
  +
       loops = prev_threads_created = 0;
       while (1) {
           for (i = 0; i < ap_threads_per_child; i++) {
               int status = ap_scoreboard_image->servers[child_num_arg][i].status;
  +            worker_wakeup_info *wakeup;
   
               if (status != SERVER_GRACEFUL && status != SERVER_DEAD) {
                   continue;
               }
   
  +            wakeup = worker_wakeup_create(pchild);
  +            if (wakeup == NULL) {
  +                ap_log_error(APLOG_MARK, APLOG_ALERT|APLOG_NOERRNO, 0,
  +                             ap_server_conf, "worker_wakeup_create failed");
  +                clean_child_exit(APEXIT_CHILDFATAL);
  +            }
  +            worker_wakeups[threads_created] = wakeup;
               my_info = (proc_info *)malloc(sizeof(proc_info));
               if (my_info == NULL) {
                   ap_log_error(APLOG_MARK, APLOG_ALERT, errno, ap_server_conf,
  
  
  

Mime
View raw message