httpd-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aaron Bannert <aa...@clove.org>
Subject [PATCH] time-space tradeoff (reuse tpool, one CV per worker thread)
Date Fri, 23 Nov 2001 23:01:38 GMT
On Fri, Nov 23, 2001 at 11:46:25AM -0800, Brian Pane wrote:
> Sounds good.  I think the "apr_pool_create_for_thread()" function that
> I proposed earlier this morning will work well in combination with the
> "time-space-tradeoff" worker design, so I'll continue with the prototyping
> on the former.

Here's an updated version of my worker redesign. The "queue" is really a
stack, but I didn't change the name for the sake of having a readable
patch -- if we end up going with this patch I'll rename everything to
"stack".

Some preliminary results: uniprocessor sol8/intel hitting /index.html.en
(ab -c 10 -n 10000):

new code 633.6 r/s (this patch)
mpstat 5:
CPU minf mjf xcal  intr ithr  csw icsw migr smtx  srw syscl  usr sys  wt idl
  0    6   0    0  4032 3932 6401 2856    0   45    0 21235   31  61   0   8
  0    7   0    0  3878 3778 6296 2763    0   41    0 20404   32  57   0  10

old code 629.6 r/s (cvs HEAD)
mpstat 5:
CPU minf mjf xcal  intr ithr  csw icsw migr smtx  srw syscl  usr sys  wt idl
  0    4   0    0  4073 3973 6302 2526    0   57    0 25240   33  59   0   8
  0    7   0    0  3704 3604 5757 2421    0   43    2 22985   28  55   0  17

I don't expect this small sampling to predict future performance, but at
least it proves that I didn't make any mistakes in bringing this patch
back up to date. It also shows that at least under these conditions this
design performs on par with the current worker code.

-aaron


Index: server/mpm/worker/worker.c
===================================================================
RCS file: /home/cvs/httpd-2.0/server/mpm/worker/worker.c,v
retrieving revision 1.43
diff -u -r1.43 worker.c
--- server/mpm/worker/worker.c	2001/11/22 05:13:29	1.43
+++ server/mpm/worker/worker.c	2001/11/23 22:32:47
@@ -68,9 +68,10 @@
 #include "apr_strings.h"
 #include "apr_file_io.h"
 #include "apr_thread_proc.h"
-#include "apr_signal.h"
 #include "apr_thread_mutex.h"
+#include "apr_thread_cond.h"
 #include "apr_proc_mutex.h"
+#include "apr_signal.h"
 #define APR_WANT_STRFUNC
 #include "apr_want.h"
 
@@ -141,6 +142,25 @@
     apr_threadattr_t *threadattr;
 } thread_starter;
 
+/* State of a particular worker.
+ */
+typedef enum {
+    WORKER_ELEM_IDLE,    /* 0 - idle (ready for another connection) */
+    WORKER_ELEM_BUSY,    /* 1 - busy (currently processing a connection) */
+    WORKER_ELEM_QUIT     /* 2 - time to quit */
+} worker_elem_state_e;
+
+/* Structure used to keep track of the current state of a particular
+ * worker thread.
+ */
+typedef struct {
+    apr_pool_t          *pool;  /* pool to use when calling accept() */
+    apr_socket_t        *sd;    /* socket returned from accept() */
+    worker_elem_state_e  state;
+    apr_thread_mutex_t  *mutex;
+    apr_thread_cond_t   *cond;
+} worker_elem_t;
+
 /*
  * The max child slot ever assigned, preserved across restarts.  Necessary
  * to deal with MaxClients changes across AP_SIG_GRACEFUL restarts.  We 
@@ -202,8 +222,6 @@
 static void signal_workers(void)
 {
     workers_may_exit = 1;
-    /* XXX: This will happen naturally on a graceful, and we don't care otherwise.
-    ap_queue_signal_all_wakeup(worker_queue); */
     ap_queue_interrupt_all(worker_queue);
 }
 
@@ -562,9 +580,8 @@
     int process_slot = ti->pid;
     int thread_slot = ti->tid;
     apr_pool_t *tpool = apr_thread_pool_get(thd);
-    void *csd = NULL;
-    apr_pool_t *ptrans;		/* Pool for per-transaction stuff */
-    int n;
+    int n, numalive;
+    worker_elem_t *a_worker;
     apr_pollfd_t *pollset;
     apr_status_t rv;
     ap_listen_rec *lr, *last_lr = ap_listeners;
@@ -639,10 +656,23 @@
         }
     got_fd:
         if (!workers_may_exit) {
-            /* create a new transaction pool for each accepted socket */
-            apr_pool_create(&ptrans, tpool);
-
-            rv = lr->accept_func(&csd, lr, ptrans);
+            if ((rv = ap_queue_pop(worker_queue, (void **)&a_worker))
+                != APR_SUCCESS) {
+                signal_workers();
+            }
+            if ((rv = apr_thread_mutex_lock(a_worker->mutex))
+                != APR_SUCCESS) {
+                ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                             "apr_thread_mutex_lock failed. Attempting "
+                             "to shutdown process gracefully.");
+                signal_workers();
+            }
+            if ((rv = lr->accept_func(&a_worker->sd, lr, a_worker->pool))
+                != APR_SUCCESS) {
+                a_worker->sd = NULL;
+                ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf,
+                             "apr_accept");
+            }
 
             if (rv == APR_EGENERAL) {
                 signal_workers();
@@ -654,17 +684,17 @@
                              "shutdown process gracefully.");
                 signal_workers();
             }
-            if (csd != NULL) {
-                rv = ap_queue_push(worker_queue, csd, ptrans);
-                if (rv) {
-                    /* trash the connection; we couldn't queue the connected
-                     * socket to a worker 
-                     */
-                    apr_socket_close(csd);
-                    ap_log_error(APLOG_MARK, APLOG_CRIT, 0, ap_server_conf,
-                                 "ap_queue_push failed with error code %d",
-                                 rv);
-                }
+
+            /* Signal worker that it's time to go. */
+            a_worker->state = WORKER_ELEM_BUSY;
+            apr_thread_cond_signal(a_worker->cond);
+
+            if ((rv = apr_thread_mutex_unlock(a_worker->mutex))
+                != APR_SUCCESS) {
+                ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                             "apr_proc_mutex_unlock failed. Attempting to "
+                             "shutdown process gracefully.");
+                signal_workers();
             }
         }
         else {
@@ -679,6 +709,24 @@
         }
     }
 
+    /* Kill off the workers in a nice way. */
+    numalive = ap_threads_per_child;
+    while (numalive > 0) {
+        if ((rv = ap_queue_pop(worker_queue, (void *)&a_worker))
+            != APR_SUCCESS) {
+            ap_log_error(APLOG_MARK, APLOG_EMERG, 0, ap_server_conf,
+                         "ap_queue_pop failed during shutdown with error "
+                         "code %d", rv);
+        }
+        else {
+            apr_thread_mutex_lock(a_worker->mutex);
+            a_worker->state = WORKER_ELEM_QUIT;
+            apr_thread_cond_signal(a_worker->cond);
+            apr_thread_mutex_unlock(a_worker->mutex);
+        }
+        --numalive;
+    }
+
     ap_update_child_status(process_slot, thread_slot, (dying) ? SERVER_DEAD : SERVER_GRACEFUL,
         (request_rec *) NULL);
     dying = 1;
@@ -691,35 +739,81 @@
     return NULL;
 }
 
-static void *worker_thread(apr_thread_t *thd, void * dummy)
+static void *worker_thread(apr_thread_t *thd, void *dummy)
 {
     proc_info * ti = dummy;
     int process_slot = ti->pid;
     int thread_slot = ti->tid;
-    apr_socket_t *csd = NULL;
-    apr_pool_t *ptrans;		/* Pool for per-transaction stuff */
+    apr_pool_t *tpool = apr_thread_pool_get(thd);
+    worker_elem_t my_state;
     apr_status_t rv;
 
+    ap_update_child_status(process_slot, thread_slot, SERVER_STARTING, NULL);
+
     free(ti);
 
-    ap_update_child_status(process_slot, thread_slot, SERVER_STARTING, NULL);
-    while (!workers_may_exit) {
+    apr_pool_create(&my_state.pool, tpool);
+    if ((rv = apr_thread_mutex_create(&my_state.mutex,
+                                      APR_THREAD_MUTEX_DEFAULT, tpool))
+        != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                     "apr_thread_mutex_create");
+    }
+    if ((rv = apr_thread_cond_create(&my_state.cond, tpool))
+        != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                     "apr_thread_cond_create");
+    }
+
+    if ((rv = apr_thread_mutex_lock(my_state.mutex)) != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                     "apr_thread_mutex_lock");
+    }
+
+    while (1) {
+        my_state.sd = NULL;
+        my_state.state = WORKER_ELEM_IDLE;
+
         ap_update_child_status(process_slot, thread_slot, SERVER_READY, NULL);
-        rv = ap_queue_pop(worker_queue, &csd, &ptrans);
-        /* We get FD_QUEUE_EINTR whenever ap_queue_pop() has been interrupted
-         * from an explicit call to ap_queue_interrupt_all(). This allows
-         * us to unblock threads stuck in ap_queue_pop() when a shutdown
-         * is pending. */
-        if (rv == FD_QUEUE_EINTR || !csd) {
-            continue;
-        }
-        process_socket(ptrans, csd, process_slot, thread_slot);
-        requests_this_child--; /* FIXME: should be synchronized - aaron */
-        apr_pool_destroy(ptrans);
+
+        /* Make ourselves available as a connection-processing worker. */
+        if ((rv = ap_queue_push(worker_queue, &my_state)) != APR_SUCCESS) {
+            ap_log_error(APLOG_MARK, APLOG_EMERG, 0, ap_server_conf,
+                         "ap_stack_push failed with error code %d", rv);
+        }
+
+        /* Because of the way this is architected, we will always have
+         * a context switch here. It would be neat if we could come up
+         * with a good way to avoid the call to cond_wait. -aaron
+         */
+        while (my_state.state == WORKER_ELEM_IDLE) {
+            apr_thread_cond_wait(my_state.cond, my_state.mutex);
+        }
+        /* Did someone wake us up to notice that it is time to exit? */
+        if (my_state.state == WORKER_ELEM_QUIT) {
+            break;
+        }
+        else if (my_state.sd != NULL) {
+            process_socket(my_state.pool, my_state.sd,
+                           process_slot, thread_slot);
+            requests_this_child--; /* FIXME: should be synchronized -aaron */
+        }
+        apr_pool_clear(my_state.pool);
     }
 
     ap_update_child_status(process_slot, thread_slot,
-        (dying) ? SERVER_DEAD : SERVER_GRACEFUL, (request_rec *) NULL);
+        (dying) ? SERVER_DEAD : SERVER_GRACEFUL, (request_rec *)NULL);
+
+    if ((rv = apr_thread_cond_destroy(my_state.cond)) != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                     "apr_thread_cond_destroy");
+    }
+    if ((rv = apr_thread_mutex_destroy(my_state.mutex)) != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                     "apr_thread_mutex_destroy");
+    }
+    apr_pool_destroy(my_state.pool);
+
     apr_thread_mutex_lock(worker_thread_count_mutex);
     worker_thread_count--;
     apr_thread_mutex_unlock(worker_thread_count_mutex);
Index: server/mpm/worker/fdqueue.c
===================================================================
RCS file: /home/cvs/httpd-2.0/server/mpm/worker/fdqueue.c,v
retrieving revision 1.9
diff -u -r1.9 fdqueue.c
--- server/mpm/worker/fdqueue.c	2001/10/17 16:29:36	1.9
+++ server/mpm/worker/fdqueue.c	2001/11/23 22:32:47
@@ -82,7 +82,6 @@
      * XXX: We should at least try to signal an error here, it is
      * indicative of a programmer error. -aaron */
     apr_thread_cond_destroy(queue->not_empty);
-    apr_thread_cond_destroy(queue->not_full);
     apr_thread_mutex_destroy(queue->one_big_mutex);
 
     return FD_QUEUE_SUCCESS;
@@ -93,26 +92,21 @@
  */
 int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a) 
 {
-    int i;
-
     /* FIXME: APRize these return values. */
     if (apr_thread_mutex_create(&queue->one_big_mutex,
-                              APR_THREAD_MUTEX_DEFAULT, a) != APR_SUCCESS)
-        return FD_QUEUE_FAILURE;
-    if (apr_thread_cond_create(&queue->not_empty, a) != APR_SUCCESS)
+                                APR_THREAD_MUTEX_DEFAULT, a) != APR_SUCCESS) {
         return FD_QUEUE_FAILURE;
-    if (apr_thread_cond_create(&queue->not_full, a) != APR_SUCCESS)
+    }
+    if (apr_thread_cond_create(&queue->not_empty, a) != APR_SUCCESS) {
         return FD_QUEUE_FAILURE;
+    }
 
     queue->tail = 0;
-    queue->data = apr_palloc(a, queue_capacity * sizeof(fd_queue_elem_t));
+    queue->data = apr_palloc(a, queue_capacity * sizeof(void*));
     queue->bounds = queue_capacity;
-
-    /* Set all the sockets in the queue to NULL */
-    for (i = 0; i < queue_capacity; ++i)
-        queue->data[i].sd = NULL;
 
-    apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
+    apr_pool_cleanup_register(a, queue, ap_queue_destroy,
+                              apr_pool_cleanup_null);
 
     return FD_QUEUE_SUCCESS;
 }
@@ -122,23 +116,29 @@
  * the push operation has completed, it signals other threads waiting
  * in apr_queue_pop() that they may continue consuming sockets.
  */
-int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p) 
+int ap_queue_push(fd_queue_t *queue, void *e)
 {
-    fd_queue_elem_t *elem;
-
     if (apr_thread_mutex_lock(queue->one_big_mutex) != APR_SUCCESS) {
         return FD_QUEUE_FAILURE;
     }
 
-    while (ap_queue_full(queue)) {
-        apr_thread_cond_wait(queue->not_full, queue->one_big_mutex);
+    /* If they push too many, they didn't allocate enough slots
+     * in the stack, and we treat that as fatal. */
+    if (ap_queue_full(queue)) {
+        if (apr_thread_mutex_unlock(queue->one_big_mutex) != APR_SUCCESS) {
+            return FD_QUEUE_FAILURE;
+        }
+        return FD_QUEUE_OVERFLOW;
     }
 
-    elem = &queue->data[queue->tail++];
-    elem->sd = sd;
-    elem->p = p;
+    queue->data[queue->tail++] = e;
 
-    apr_thread_cond_signal(queue->not_empty);
+    /* Only perform the overhead of signaling if we were empty before
+     * inserting this element.
+     */
+    if (1 == queue->tail) {
+        apr_thread_cond_signal(queue->not_empty);
+    }
 
     if (apr_thread_mutex_unlock(queue->one_big_mutex) != APR_SUCCESS) {
         return FD_QUEUE_FAILURE;
@@ -153,16 +153,14 @@
  * Once retrieved, the socket is placed into the address specified by
  * 'sd'.
  */
-apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p) 
+apr_status_t ap_queue_pop(fd_queue_t *queue, void **e)
 {
-    fd_queue_elem_t *elem;
-
     if (apr_thread_mutex_lock(queue->one_big_mutex) != APR_SUCCESS) {
         return FD_QUEUE_FAILURE;
     }
 
     /* Keep waiting until we wake up and find that the queue is not empty. */
-    if (ap_queue_empty(queue)) {
+    while (ap_queue_empty(queue)) {
         apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
         /* If we wake up and it's still empty, then we were interrupted */
         if (ap_queue_empty(queue)) {
@@ -173,16 +171,7 @@
         }
     } 
     
-    elem = &queue->data[--queue->tail];
-    *sd = elem->sd;
-    *p = elem->p;
-    elem->sd = NULL;
-    elem->p = NULL;
-
-    /* signal not_full if we were full before this pop */
-    if (queue->tail == queue->bounds - 1) {
-        apr_thread_cond_signal(queue->not_full);
-    }
+    *e = queue->data[--queue->tail];
 
     if (apr_thread_mutex_unlock(queue->one_big_mutex) != APR_SUCCESS) {
         return FD_QUEUE_FAILURE;
@@ -197,9 +186,6 @@
         return FD_QUEUE_FAILURE;
     }
     apr_thread_cond_broadcast(queue->not_empty);
-    /* We shouldn't have multiple threads sitting in not_full, but
-     * broadcast just in case. */
-    apr_thread_cond_broadcast(queue->not_full);
     if (apr_thread_mutex_unlock(queue->one_big_mutex) != APR_SUCCESS) {
         return FD_QUEUE_FAILURE;
     }
Index: server/mpm/worker/fdqueue.h
===================================================================
RCS file: /home/cvs/httpd-2.0/server/mpm/worker/fdqueue.h,v
retrieving revision 1.9
diff -u -r1.9 fdqueue.h
--- server/mpm/worker/fdqueue.h	2001/10/17 16:29:37	1.9
+++ server/mpm/worker/fdqueue.h	2001/11/23 22:32:47
@@ -73,29 +73,21 @@
 #define FD_QUEUE_FAILURE -1 /* Needs to be an invalid file descriptor because
                                of queue_pop semantics */
 #define FD_QUEUE_EINTR APR_EINTR
+#define FD_QUEUE_OVERFLOW -2
 
-struct fd_queue_elem_t {
-    apr_socket_t      *sd;
-    apr_pool_t        *p;
-};
-typedef struct fd_queue_elem_t fd_queue_elem_t;
-
 struct fd_queue_t {
     int                 tail;
-    fd_queue_elem_t    *data;
+    void              **data;
     int                 bounds;
-    int                 blanks;
     apr_thread_mutex_t *one_big_mutex;
     apr_thread_cond_t  *not_empty;
-    apr_thread_cond_t  *not_full;
-    int                 cancel_state;
 };
 typedef struct fd_queue_t fd_queue_t;
 
 /* FIXME: APRize these -- return values should be apr_status_t */
 int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a);
-int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p);
-apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p);
-apr_status_t ap_queue_interrupt_all(fd_queue_t *queue);
+int ap_queue_push(fd_queue_t *queue, void *e);
+int ap_queue_pop(fd_queue_t *queue, void **e);
+int ap_queue_interrupt_all(fd_queue_t *queue);
 
 #endif /* FDQUEUE_H */

Mime
View raw message