Return-Path: Delivered-To: apmail-apache-cvs-archive@apache.org Received: (qmail 98795 invoked by uid 500); 5 Aug 2001 18:42:50 -0000 Mailing-List: contact apache-cvs-help@apache.org; run by ezmlm Precedence: bulk Reply-To: new-httpd@apache.org list-help: list-unsubscribe: list-post: Delivered-To: mailing list apache-cvs@apache.org Received: (qmail 98784 invoked by uid 500); 5 Aug 2001 18:42:49 -0000 Delivered-To: apmail-httpd-2.0-cvs@apache.org Date: 5 Aug 2001 18:41:38 -0000 Message-ID: <20010805184138.20684.qmail@icarus.apache.org> From: rbb@apache.org To: httpd-2.0-cvs@apache.org Subject: cvs commit: httpd-2.0/server/mpm/worker fdqueue.c fdqueue.h worker.c X-Spam-Rating: h31.sny.collab.net 1.6.2 0/1000/N Status: O X-Status: X-Keywords: X-UID: 55 rbb 01/08/05 11:41:38 Modified: server/mpm/worker fdqueue.c fdqueue.h worker.c Log: Get the worker MPM working again. This should fix the serialization problems, and it makes up initialize the queue only once. Revision Changes Path 1.3 +63 -59 httpd-2.0/server/mpm/worker/fdqueue.c Index: fdqueue.c =================================================================== RCS file: /home/cvs/httpd-2.0/server/mpm/worker/fdqueue.c,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- fdqueue.c 2001/07/31 15:35:28 1.2 +++ fdqueue.c 2001/08/05 18:41:38 1.3 @@ -57,112 +57,116 @@ */ #include "fdqueue.h" -#include "apr_pools.h" -/* Assumption: queue itself is allocated by the user */ /* Assumption: increment and decrement are atomic on int */ -int ap_queue_size(FDQueue *queue) { - return ((queue->tail - queue->head + queue->bounds) % queue->bounds); -} - -int ap_queue_full(FDQueue *queue) { - return(queue->blanks <= 0); -} - -int ap_block_on_queue(FDQueue *queue) { -#if 0 +int ap_increase_blanks(FDQueue *queue) +{ if (pthread_mutex_lock(&queue->one_big_mutex) != 0) { return FD_QUEUE_FAILURE; } -#endif - if (ap_queue_full(queue)) { - pthread_cond_wait(&queue->not_full, &queue->one_big_mutex); - } -#if 0 + queue->blanks++; if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) { return FD_QUEUE_FAILURE; } -#endif - return FD_QUEUE_SUCCESS; -} - -static int increase_blanks(FDQueue *queue) { - queue->blanks++; return FD_QUEUE_SUCCESS; } -static apr_status_t ap_queue_destroy(void *data) { +static apr_status_t ap_queue_destroy(void *data) +{ FDQueue *queue = data; /* Ignore errors here, we can't do anything about them anyway */ - pthread_cond_destroy(&queue->not_empty); - pthread_cond_destroy(&queue->not_full); + pthread_cond_destroy(&(queue->not_empty)); + pthread_cond_destroy(&(queue->not_full)); pthread_mutex_destroy(&queue->one_big_mutex); return FD_QUEUE_SUCCESS; } -int ap_queue_init(FDQueue *queue, int queue_capacity, apr_pool_t *a) { +int ap_queue_init(FDQueue *queue, int queue_capacity, apr_pool_t *a) +{ int i; int bounds = queue_capacity + 1; + pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER; + pthread_cond_t not_full = PTHREAD_COND_INITIALIZER; + queue->not_empty = not_empty; + queue->not_full = not_full; pthread_mutex_init(&queue->one_big_mutex, NULL); - pthread_cond_init(&queue->not_empty, NULL); - pthread_cond_init(&queue->not_full, NULL); queue->head = queue->tail = 0; queue->data = apr_palloc(a, bounds * sizeof(FDQueueElement)); queue->bounds = bounds; - queue->blanks = queue_capacity; + queue->blanks = 0; apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null); for (i=0; i < bounds; ++i) queue->data[i].sd = NULL; return FD_QUEUE_SUCCESS; } -int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p) { +int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p) +{ + if (pthread_mutex_lock(&queue->one_big_mutex) != 0) { + return FD_QUEUE_FAILURE; + } queue->data[queue->tail].sd = sd; - queue->data[queue->tail].p = p; + queue->data[queue->tail].p = p; queue->tail = (queue->tail + 1) % queue->bounds; queue->blanks--; - pthread_cond_signal(&queue->not_empty); -#if 0 - if (queue->head == (queue->tail + 1) % queue->bounds) { -#endif - if (ap_queue_full(queue)) { - pthread_cond_wait(&queue->not_full, &queue->one_big_mutex); + if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) { + return FD_QUEUE_FAILURE; } + pthread_cond_signal(&(queue->not_empty)); return FD_QUEUE_SUCCESS; } -apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p, int block_if_empty) { - increase_blanks(queue); - /* We have just removed one from the queue. By definition, it is - * no longer full. We can ALWAYS signal the listener thread at - * this point. However, the original code didn't do it this way, - * so I am leaving the original code in, just commented out. BTW, - * originally, the increase_blanks wasn't in this function either. - * - if (queue->blanks > 0) { - */ - pthread_cond_signal(&queue->not_full); - - /* } */ +apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p) +{ + if (pthread_mutex_lock(&queue->one_big_mutex) != 0) { + return FD_QUEUE_FAILURE; + } if (queue->head == queue->tail) { - if (block_if_empty) { - pthread_cond_wait(&queue->not_empty, &queue->one_big_mutex); -fprintf(stderr, "Found a non-empty queue :-)\n"); - } + pthread_cond_wait(&(queue->not_empty), &queue->one_big_mutex); } *sd = queue->data[queue->head].sd; - *p = queue->data[queue->head].p; + *p = queue->data[queue->head].p; queue->data[queue->head].sd = NULL; - if (*sd != NULL) { + queue->data[queue->head].p = NULL; + if (sd != NULL) { queue->head = (queue->head + 1) % queue->bounds; } + if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) { + return FD_QUEUE_FAILURE; + } + if (queue->blanks > 0) { + pthread_cond_signal(&(queue->not_full)); + } return APR_SUCCESS; } +int ap_queue_size(FDQueue *queue) +{ + return ((queue->tail - queue->head + queue->bounds) % queue->bounds); +} + +int ap_queue_full(FDQueue *queue) +{ + return(queue->blanks <= 0); +} + +int ap_block_on_queue(FDQueue *queue) +{ + if (pthread_mutex_lock(&queue->one_big_mutex) != 0) { + return FD_QUEUE_FAILURE; + } + if (ap_queue_full(queue)) { + pthread_cond_wait(&(queue->not_full), &queue->one_big_mutex); + } + if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) { + return FD_QUEUE_FAILURE; + } + return FD_QUEUE_SUCCESS; +} + void ap_queue_signal_all_wakeup(FDQueue *queue) { -fprintf(stderr, "trying to broadcast to all workers\n"); - pthread_cond_broadcast(&queue->not_empty); + pthread_cond_broadcast(&(queue->not_empty)); } 1.3 +2 -1 httpd-2.0/server/mpm/worker/fdqueue.h Index: fdqueue.h =================================================================== RCS file: /home/cvs/httpd-2.0/server/mpm/worker/fdqueue.h,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- fdqueue.h 2001/08/04 11:40:16 1.2 +++ fdqueue.h 2001/08/05 18:41:38 1.3 @@ -87,10 +87,11 @@ int ap_queue_init(FDQueue *queue, int queue_size, apr_pool_t *a); int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p); -apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p, int block_if_empty); +apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p); int ap_queue_size(FDQueue *queue); int ap_queue_full(FDQueue *queue); int ap_block_on_queue(FDQueue *queue); void ap_queue_signal_all_wakeup(FDQueue *queue); +int ap_increase_blanks(FDQueue *queue); #endif /* FDQUEUE_H */ 1.8 +10 -10 httpd-2.0/server/mpm/worker/worker.c Index: worker.c =================================================================== RCS file: /home/cvs/httpd-2.0/server/mpm/worker/worker.c,v retrieving revision 1.7 retrieving revision 1.8 diff -u -r1.7 -r1.8 --- worker.c 2001/08/05 18:08:49 1.7 +++ worker.c 2001/08/05 18:41:38 1.8 @@ -510,7 +510,6 @@ /* Sets workers_may_exit if we received a character on the pipe_of_death */ static void check_pipe_of_death(void) { -fprintf(stderr, "looking at pipe of death\n"); apr_lock_acquire(pipe_of_death_mutex); if (!workers_may_exit) { apr_status_t ret; @@ -684,7 +683,8 @@ free(ti); while (!workers_may_exit) { - ap_queue_pop(worker_queue, &csd, &ptrans, 1); + ap_queue_pop(worker_queue, &csd, &ptrans); + ap_increase_blanks(worker_queue); process_socket(ptrans, csd, process_slot, thread_slot); requests_this_child--; apr_pool_clear(ptrans); @@ -729,21 +729,21 @@ apr_thread_t **threads = ts->threads; apr_threadattr_t *thread_attr = ts->threadattr; int child_num_arg = ts->child_num_arg; - int i; int my_child_num = child_num_arg; proc_info *my_info = NULL; apr_status_t rv; + int i = 0; int threads_created = 0; apr_thread_t *listener; + my_info = (proc_info *)malloc(sizeof(proc_info)); + my_info->pid = my_child_num; + my_info->tid = i; + my_info->sd = 0; + apr_pool_create(&my_info->tpool, pchild); + apr_thread_create(&listener, thread_attr, listener_thread, my_info, pchild); while (1) { - my_info = (proc_info *)malloc(sizeof(proc_info)); - my_info->pid = my_child_num; - my_info->tid = i; - my_info->sd = 0; - apr_pool_create(&my_info->tpool, pchild); - apr_thread_create(&listener, thread_attr, listener_thread, my_info, pchild); - for (i=0; i < ap_threads_per_child; i++) { + for (i=1; i < ap_threads_per_child; i++) { int status = ap_scoreboard_image->servers[child_num_arg][i].status; if (status != SERVER_GRACEFUL && status != SERVER_DEAD) {