httpd-cvs mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject cvs commit: httpd-2.0/server/mpm/worker fdqueue.c fdqueue.h worker.c
Date Fri, 24 Aug 2001 16:49:39 GMT
rbb         01/08/24 09:49:39

  Modified:    .        CHANGES
               server/mpm/worker fdqueue.c fdqueue.h worker.c
  Log:
  Make the worker MPM shutdown and restart cleanly.  This also
  cleans up some race conditions, and gets the worker using
  pools more cleanly.
  
  Submitted by:	[Aaron Bannert <aaron@clove.org>]
  
  Revision  Changes    Path
  1.326     +7 -2      httpd-2.0/CHANGES
  
  Index: CHANGES
  ===================================================================
  RCS file: /home/cvs/httpd-2.0/CHANGES,v
  retrieving revision 1.325
  retrieving revision 1.326
  diff -u -r1.325 -r1.326
  --- CHANGES	2001/08/24 04:08:04	1.325
  +++ CHANGES	2001/08/24 16:49:34	1.326
  @@ -1,6 +1,11 @@
   Changes with Apache 2.0.25-dev
  -  *)  Implement CRYPTO_set_locking_callback() in terms of apr_lock
  -      for mod_ssl
  +
  +  *) Make the worker MPM shutdown and restart cleanly.  This also
  +     cleans up some race conditions, and gets the worker using
  +     pools more cleanly.  [Aaron Bannert <aaron@clove.org>]
  +
  +  *) Implement CRYPTO_set_locking_callback() in terms of apr_lock
  +     for mod_ssl
        [Madhusudan Mathihalli <madhusudan_mathihalli@hp.com>]
   
     *) Fix for mod_include. Ryan's patch to check error
  
  
  
  1.4       +101 -42   httpd-2.0/server/mpm/worker/fdqueue.c
  
  Index: fdqueue.c
  ===================================================================
  RCS file: /home/cvs/httpd-2.0/server/mpm/worker/fdqueue.c,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- fdqueue.c	2001/08/05 18:41:38	1.3
  +++ fdqueue.c	2001/08/24 16:49:39	1.4
  @@ -60,7 +60,11 @@
   
   /* Assumption: increment and decrement are atomic on int */
   
  -int ap_increase_blanks(FDQueue *queue) 
  +/**
  + * Threadsafe way to increment the number of empty slots ("blanks")
  + * in the resource queue.
  + */
  +int ap_increase_blanks(fd_queue_t *queue) 
   {
       if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
           return FD_QUEUE_FAILURE;
  @@ -69,61 +73,129 @@
       if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
           return FD_QUEUE_FAILURE;
       }
  +
       return FD_QUEUE_SUCCESS;
   }
   
  +/**
  + * Detects when the fd_queue_t is full. This utility function is expected
  + * to be called from within critical sections, and is not threadsafe.
  + */
  +static int ap_queue_full(fd_queue_t *queue)
  +{
  +    return (queue->blanks <= 0);
  +}
  +
  +/**
  + * Detects when the fd_queue_t is empty. This utility function is expected
  + * to be called from within critical sections, and is not threadsafe.
  + */
  +static int ap_queue_empty(fd_queue_t *queue)
  +{
  +    /*return (queue->head == queue->tail);*/
  +    return (queue->blanks >= queue->bounds - 1);
  +}
  +
  +/**
  + * Callback routine that is called to destroy this
  + * fd_queue_t when it's pool is destroyed.
  + */
   static apr_status_t ap_queue_destroy(void *data) 
   {
  -    FDQueue *queue = data;
  -    /* Ignore errors here, we can't do anything about them anyway */
  -    pthread_cond_destroy(&(queue->not_empty));
  -    pthread_cond_destroy(&(queue->not_full));
  +    fd_queue_t *queue = data;
  +
  +    /* Ignore errors here, we can't do anything about them anyway.
  +     * XXX: We should at least try to signal an error here, it is
  +     * indicative of a programmer error. -aaron */
  +    pthread_cond_destroy(&queue->not_empty);
  +    pthread_cond_destroy(&queue->not_full);
       pthread_mutex_destroy(&queue->one_big_mutex);
  +
       return FD_QUEUE_SUCCESS;
   }
   
  -int ap_queue_init(FDQueue *queue, int queue_capacity, apr_pool_t *a) 
  +/**
  + * Initialize the fd_queue_t.
  + */
  +int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a) 
   {
       int i;
  -    int bounds = queue_capacity + 1;
  -    pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;
  -    pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
  -    queue->not_empty = not_empty;
  -    queue->not_full = not_full;
  -    pthread_mutex_init(&queue->one_big_mutex, NULL);
  +    int bounds;
  +
  +    if (pthread_mutex_init(&queue->one_big_mutex, NULL) != 0)
  +        return FD_QUEUE_FAILURE;
  +    if (pthread_cond_init(&queue->not_empty, NULL) != 0)
  +        return FD_QUEUE_FAILURE;
  +    if (pthread_cond_init(&queue->not_full, NULL) != 0)
  +        return FD_QUEUE_FAILURE;
  +
  +    bounds = queue_capacity + 1;
       queue->head = queue->tail = 0;
  -    queue->data = apr_palloc(a, bounds * sizeof(FDQueueElement));
  +    queue->data = apr_palloc(a, bounds * sizeof(fd_queue_elem_t));
       queue->bounds = bounds;
  -    queue->blanks = 0;
  -    apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
  -    for (i=0; i < bounds; ++i)
  +    queue->blanks = queue_capacity;
  +
  +    /* Set all the sockets in the queue to NULL */
  +    for (i = 0; i < bounds; ++i)
           queue->data[i].sd = NULL;
  +
  +    apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
  +
       return FD_QUEUE_SUCCESS;
   }
   
  -int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p) 
  +/**
  + * Push a new socket onto the queue. Blocks if the queue is full. Once
  + * the push operation has completed, it signals other threads waiting
  + * in apr_queue_pop() that they may continue consuming sockets.
  + */
  +int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p) 
   {
       if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
           return FD_QUEUE_FAILURE;
       }
  +
  +    /* Keep waiting until we wake up and find that the queue is not full. */
  +    while (ap_queue_full(queue)) {
  +        pthread_cond_wait(&queue->not_full, &queue->one_big_mutex);
  +    }
  +
       queue->data[queue->tail].sd = sd;
       queue->data[queue->tail].p = p;
       queue->tail = (queue->tail + 1) % queue->bounds;
       queue->blanks--;
  +
  +    pthread_cond_signal(&queue->not_empty);
  +
       if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
           return FD_QUEUE_FAILURE;
       }
  -    pthread_cond_signal(&(queue->not_empty));
  +
       return FD_QUEUE_SUCCESS;
   }
   
  -apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p) 
  +/**
  + * Retrieves the next available socket from the queue. If there are no
  + * sockets available, it will block until one becomes available.
  + * Once retrieved, the socket is placed into the address specified by
  + * 'sd'.
  + */
  +apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p) 
   {
       if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
           return FD_QUEUE_FAILURE;
       }
  -    if (queue->head == queue->tail) {
  -        pthread_cond_wait(&(queue->not_empty), &queue->one_big_mutex);
  +
  +    /* Keep waiting until we wake up and find that the queue is not empty. */
  +    if (ap_queue_empty(queue)) {
  +        pthread_cond_wait(&queue->not_empty, &queue->one_big_mutex);
  +        /* If we wake up and it's still empty, then we were interrupted */
  +        if (ap_queue_empty(queue)) {
  +            if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
  +                return FD_QUEUE_FAILURE;
  +            }
  +            return FD_QUEUE_EINTR;
  +        }
       } 
       
       *sd = queue->data[queue->head].sd;
  @@ -133,40 +205,27 @@
       if (sd != NULL) {
           queue->head = (queue->head + 1) % queue->bounds;
       }
  +    queue->blanks++;
  +
  +    /* we just consumed a slot, so we're no longer full */
  +    pthread_cond_signal(&queue->not_full);
  +
       if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
           return FD_QUEUE_FAILURE;
       }
  -    if (queue->blanks > 0) {
  -        pthread_cond_signal(&(queue->not_full));
  -    }
  -    return APR_SUCCESS;
  -}
  -
  -int ap_queue_size(FDQueue *queue) 
  -{
  -    return ((queue->tail - queue->head + queue->bounds) % queue->bounds);
  -}
   
  -int ap_queue_full(FDQueue *queue) 
  -{
  -    return(queue->blanks <= 0);
  +    return APR_SUCCESS;
   }
   
  -int ap_block_on_queue(FDQueue *queue) 
  +apr_status_t ap_queue_interrupt_all(fd_queue_t *queue)
   {
       if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
           return FD_QUEUE_FAILURE;
  -    }
  -    if (ap_queue_full(queue)) {
  -        pthread_cond_wait(&(queue->not_full), &queue->one_big_mutex);
       }
  +    pthread_cond_broadcast(&queue->not_empty);
       if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
           return FD_QUEUE_FAILURE;
       }
       return FD_QUEUE_SUCCESS;
   }
   
  -void ap_queue_signal_all_wakeup(FDQueue *queue)
  -{
  -    pthread_cond_broadcast(&(queue->not_empty));
  -}
  
  
  
  1.5       +23 -22    httpd-2.0/server/mpm/worker/fdqueue.h
  
  Index: fdqueue.h
  ===================================================================
  RCS file: /home/cvs/httpd-2.0/server/mpm/worker/fdqueue.h,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- fdqueue.h	2001/08/22 15:40:28	1.4
  +++ fdqueue.h	2001/08/24 16:49:39	1.5
  @@ -66,34 +66,35 @@
   #include <pthread.h>
   #include <sys/types.h>
   #include <sys/socket.h>
  +#include <apr_errno.h>
   
   #define FD_QUEUE_SUCCESS 0
   #define FD_QUEUE_FAILURE -1 /* Needs to be an invalid file descriptor because
                                  of queue_pop semantics */
  +#define FD_QUEUE_EINTR APR_EINTR
   
  -typedef struct fd_queue_elem {
  -    apr_socket_t *sd;
  -    apr_pool_t *p;
  -} FDQueueElement;
  +struct fd_queue_elem_t {
  +    apr_socket_t      *sd;
  +    apr_pool_t        *p;
  +};
  +typedef struct fd_queue_elem_t fd_queue_elem_t;
   
  -typedef struct fd_queue {
  -    int head;
  -    int tail;
  -    FDQueueElement *data;
  -    int bounds;
  -    int blanks;
  -    pthread_mutex_t one_big_mutex;
  -    pthread_cond_t not_empty;
  -    pthread_cond_t not_full;
  -} FDQueue;
  +struct fd_queue_t {
  +    int                head;
  +    int                tail;
  +    fd_queue_elem_t   *data;
  +    int                bounds;
  +    int                blanks;
  +    pthread_mutex_t    one_big_mutex;
  +    pthread_cond_t     not_empty;
  +    pthread_cond_t     not_full;
  +    int                cancel_state;
  +};
  +typedef struct fd_queue_t fd_queue_t;
   
  -int ap_queue_init(FDQueue *queue, int queue_size, apr_pool_t *a);
  -int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p);
  -apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p);
  -int ap_queue_size(FDQueue *queue);
  -int ap_queue_full(FDQueue *queue);
  -int ap_block_on_queue(FDQueue *queue);
  -void ap_queue_signal_all_wakeup(FDQueue *queue);
  -int ap_increase_blanks(FDQueue *queue);
  +int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a);
  +int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p);
  +apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p);
  +apr_status_t ap_queue_interrupt_all(fd_queue_t *queue);
   
   #endif /* FDQUEUE_H */
  
  
  
  1.15      +34 -22    httpd-2.0/server/mpm/worker/worker.c
  
  Index: worker.c
  ===================================================================
  RCS file: /home/cvs/httpd-2.0/server/mpm/worker/worker.c,v
  retrieving revision 1.14
  retrieving revision 1.15
  diff -u -r1.14 -r1.15
  --- worker.c	2001/08/16 13:59:14	1.14
  +++ worker.c	2001/08/24 16:49:39	1.15
  @@ -124,14 +124,13 @@
   static int requests_this_child;
   static int num_listensocks = 0;
   static apr_socket_t **listensocks;
  -static FDQueue *worker_queue;
  +static fd_queue_t *worker_queue;
   
   /* The structure used to pass unique initialization info to each thread */
   typedef struct {
       int pid;
       int tid;
       int sd;
  -    apr_pool_t *tpool; /* "pthread" would be confusing */
   } proc_info;
   
   /* Structure used to pass information to the thread responsible for 
  @@ -201,7 +200,9 @@
   static void signal_workers(void)
   {
       workers_may_exit = 1;
  -    ap_queue_signal_all_wakeup(worker_queue);
  +    /* XXX: This will happen naturally on a graceful, and we don't care otherwise.
  +    ap_queue_signal_all_wakeup(worker_queue); */
  +    ap_queue_interrupt_all(worker_queue);
   }
   
   AP_DECLARE(apr_status_t) ap_mpm_query(int query_code, int *result)
  @@ -553,7 +554,7 @@
       proc_info * ti = dummy;
       int process_slot = ti->pid;
       int thread_slot = ti->tid;
  -    apr_pool_t *tpool = ti->tpool;
  +    apr_pool_t *tpool = apr_thread_pool_get(thd);
       apr_socket_t *csd = NULL;
       apr_pool_t *ptrans;		/* Pool for per-transaction stuff */
       apr_socket_t *sd = NULL;
  @@ -564,8 +565,6 @@
   
       free(ti);
   
  -    apr_pool_create(&ptrans, tpool);
  -
       apr_lock_acquire(worker_thread_count_mutex);
       worker_thread_count++;
       apr_lock_release(worker_thread_count_mutex);
  @@ -574,12 +573,10 @@
       for(n=0 ; n <= num_listensocks ; ++n)
   	apr_poll_socket_add(pollset, listensocks[n], APR_POLLIN);
   
  -    worker_queue = apr_pcalloc(pchild, sizeof(*worker_queue));
  -    ap_queue_init(worker_queue, ap_threads_per_child, pchild);
  -
       /* TODO: Switch to a system where threads reuse the results from earlier
          poll calls - manoj */
       while (1) {
  +        /* TODO: requests_this_child should be synchronized - aaron */
           if (requests_this_child <= 0) {
               check_infinite_requests();
           }
  @@ -644,6 +641,9 @@
           }
       got_fd:
           if (!workers_may_exit) {
  +            /* create a new transaction pool for each accepted socket */
  +            apr_pool_create(&ptrans, tpool);
  +
               if ((rv = apr_accept(&csd, sd, ptrans)) != APR_SUCCESS) {
                   csd = NULL;
                   ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, 
  @@ -658,7 +658,6 @@
               }
               if (csd != NULL) {
                   ap_queue_push(worker_queue, csd, ptrans);
  -                ap_block_on_queue(worker_queue);
               }
           }
           else {
  @@ -673,13 +672,15 @@
           }
       }
   
  -    apr_pool_destroy(tpool);
       ap_update_child_status(process_slot, thread_slot, (dying) ? SERVER_DEAD : SERVER_GRACEFUL,
           (request_rec *) NULL);
       dying = 1;
       ap_scoreboard_image->parent[process_slot].quiescing = 1;
       kill(ap_my_pid, SIGTERM);
   
  +/* this is uncommented when we make a pool-pool
  +    apr_thread_exit(thd, APR_SUCCESS);
  +*/
       return NULL;
   }
   
  @@ -688,30 +689,35 @@
       proc_info * ti = dummy;
       int process_slot = ti->pid;
       int thread_slot = ti->tid;
  -    apr_pool_t *tpool = ti->tpool;
       apr_socket_t *csd = NULL;
       apr_pool_t *ptrans;		/* Pool for per-transaction stuff */
  +    apr_status_t rv;
   
       free(ti);
   
  +    /* apr_pool_create(&ptrans, tpool); */
  +
       while (!workers_may_exit) {
  -        ap_queue_pop(worker_queue, &csd, &ptrans);
  -        if (!csd) {
  +        rv = ap_queue_pop(worker_queue, &csd, &ptrans);
  +        /* We get FD_QUEUE_EINTR whenever ap_queue_pop() has been interrupted
  +         * from an explicit call to ap_queue_interrupt_all(). This allows
  +         * us to unblock threads stuck in ap_queue_pop() when a shutdown
  +         * is pending. */
  +        if (rv == FD_QUEUE_EINTR || !csd) {
               continue;
           }
  -        ap_increase_blanks(worker_queue);
           process_socket(ptrans, csd, process_slot, thread_slot);
  -        requests_this_child--;
  -        apr_pool_clear(ptrans);
  +        requests_this_child--; /* FIXME: should be synchronized - aaron */
  +        apr_pool_destroy(ptrans);
       }
   
  -    apr_pool_destroy(tpool);
       ap_update_child_status(process_slot, thread_slot, (dying) ? SERVER_DEAD : SERVER_GRACEFUL,
           (request_rec *) NULL);
       apr_lock_acquire(worker_thread_count_mutex);
       worker_thread_count--;
       apr_lock_release(worker_thread_count_mutex);
   
  +    apr_thread_exit(thd, APR_SUCCESS);
       return NULL;
   }
   
  @@ -738,14 +744,20 @@
       int threads_created = 0;
       apr_thread_t *listener;
   
  +    /* We must create the fd queue before we start up the listener
  +     * and worker threads. */
  +    worker_queue = apr_pcalloc(pchild, sizeof(*worker_queue));
  +    ap_queue_init(worker_queue, ap_threads_per_child, pchild);
  +
       my_info = (proc_info *)malloc(sizeof(proc_info));
       my_info->pid = my_child_num;
       my_info->tid = i;
       my_info->sd = 0;
  -    apr_pool_create(&my_info->tpool, pchild);
       apr_thread_create(&listener, thread_attr, listener_thread, my_info, pchild);
       while (1) {
  -        for (i=1; i < ap_threads_per_child; i++) {
  +        /* Does ap_threads_per_child include the listener thread?
  +         * Why does this forloop start at 1? -aaron */
  +        for (i = 1; i < ap_threads_per_child; i++) {
               int status = ap_scoreboard_image->servers[child_num_arg][i].status;
   
               if (status != SERVER_GRACEFUL && status != SERVER_DEAD) {
  @@ -761,7 +773,6 @@
   	    my_info->pid = my_child_num;
               my_info->tid = i;
   	    my_info->sd = 0;
  -	    apr_pool_create(&my_info->tpool, pchild);
   	
     	    /* We are creating threads right now */
   	    (void) ap_update_child_status(my_child_num, i, SERVER_STARTING, 
  @@ -794,6 +805,7 @@
        *  "life_status" is almost right, but it's in the worker's structure, and 
        *  the name could be clearer.   gla
        */
  +    apr_thread_exit(thd, APR_SUCCESS);
       return NULL;
   }
   
  @@ -870,7 +882,7 @@
       apr_lock_create(&pipe_of_death_mutex, APR_MUTEX, APR_INTRAPROCESS, 
                       NULL, pchild);
   
  -    ts = apr_palloc(pchild, sizeof(*ts));
  +    ts = (thread_starter *)apr_palloc(pchild, sizeof(*ts));
   
       apr_threadattr_create(&thread_attr, pchild);
       apr_threadattr_detach_set(thread_attr, 0);    /* 0 means PTHREAD_CREATE_JOINABLE */
  
  
  

Mime
View raw message