httpd-cvs mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pque...@apache.org
Subject svn commit: r1202257 - in /httpd/httpd/trunk/server/mpm/event: config3.m4 equeue.c equeue.h event.c
Date Tue, 15 Nov 2011 15:51:04 GMT
Author: pquerna
Date: Tue Nov 15 15:51:03 2011
New Revision: 1202257

URL: http://svn.apache.org/viewvc?rev=1202257&view=rev
Log:
Create a new lock free circular queue, and use it in the EventMPM to remove the timeout mutex
that was wrapping both timeout queue operations and pollset operations.

Added:
    httpd/httpd/trunk/server/mpm/event/equeue.c   (with props)
    httpd/httpd/trunk/server/mpm/event/equeue.h   (with props)
Modified:
    httpd/httpd/trunk/server/mpm/event/config3.m4
    httpd/httpd/trunk/server/mpm/event/event.c

Modified: httpd/httpd/trunk/server/mpm/event/config3.m4
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/server/mpm/event/config3.m4?rev=1202257&r1=1202256&r2=1202257&view=diff
==============================================================================
--- httpd/httpd/trunk/server/mpm/event/config3.m4 (original)
+++ httpd/httpd/trunk/server/mpm/event/config3.m4 Tue Nov 15 15:51:03 2011
@@ -6,6 +6,6 @@ if test "$ac_cv_serf" = yes ; then
 fi
 APACHE_SUBST(MOD_MPM_EVENT_LDADD)
 
-APACHE_MPM_MODULE(event, $enable_mpm_event, event.lo fdqueue.lo pod.lo,[
+APACHE_MPM_MODULE(event, $enable_mpm_event, event.lo fdqueue.lo equeue.lo pod.lo,[
     AC_CHECK_FUNCS(pthread_kill)
 ], , [\$(MOD_MPM_EVENT_LDADD)])

Added: httpd/httpd/trunk/server/mpm/event/equeue.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/server/mpm/event/equeue.c?rev=1202257&view=auto
==============================================================================
--- httpd/httpd/trunk/server/mpm/event/equeue.c (added)
+++ httpd/httpd/trunk/server/mpm/event/equeue.c Tue Nov 15 15:51:03 2011
@@ -0,0 +1,125 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "equeue.h"
+
+#include <apr_atomic.h>
+#include <sched.h>
+
+struct ap_equeue_t {
+    apr_uint32_t nelem;
+    apr_size_t elem_size;
+    uint8_t *bytes;
+    volatile apr_uint32_t writeCount;
+    volatile apr_uint32_t readCount;
+};
+
+
+static APR_INLINE apr_uint32_t count_to_index(ap_equeue_t *eq, apr_uint32_t count)
+{
+    return (count & (eq->nelem - 1));
+}
+
+static APR_INLINE void* index_to_bytes(ap_equeue_t *eq, apr_uint32_t idx)
+{
+    apr_size_t offset = idx * eq->elem_size;
+    return (void*)&eq->bytes[offset];
+}
+
+static APR_INLINE apr_uint32_t nearest_power(apr_uint32_t num)
+{
+    apr_uint32_t n = 1;
+    while (n < num) {
+        n <<= 1;
+    }
+
+    return n;
+}
+
+#if 0
+static void dump_queue(ap_equeue_t *eq)
+{
+    apr_uint32_t i;
+
+    fprintf(stderr, "dumping %p\n", eq);
+    fprintf(stderr, "  nelem:   %u\n", eq->nelem);
+    fprintf(stderr, "  esize:   %"APR_SIZE_T_FMT"\n", eq->elem_size);
+    fprintf(stderr, "  wcnt:    %u\n", eq->writeCount);
+    fprintf(stderr, "  rcnt:    %u\n", eq->writeCount);
+    fprintf(stderr, "  bytes:   %p\n", eq->bytes);
+    for (i = 0; i < eq->nelem; i++) {
+        fprintf(stderr, "    [%u] = %p\n", i, index_to_bytes(eq, i));
+    }
+
+    fprintf(stderr, "\n");
+    fflush(stderr);
+}
+#endif
+
+apr_status_t
+ap_equeue_create(apr_pool_t *p, apr_uint32_t nelem, apr_size_t elem_size, ap_equeue_t **eqout)
+{
+    ap_equeue_t *eq;
+
+    *eqout = NULL;
+
+    eq = apr_palloc(p, sizeof(ap_equeue_t));
+    eq->bytes = apr_palloc(p, (1 + nelem) * elem_size);
+    eq->nelem = nearest_power(nelem);
+    eq->elem_size = elem_size;
+    eq->writeCount = 0;
+    eq->readCount = 0;
+    *eqout = eq;
+
+    return APR_SUCCESS;
+}
+
+void *
+ap_equeue_reader_next(ap_equeue_t *eq)
+{
+    if (apr_atomic_read32(&eq->writeCount) == eq->readCount) {
+        return NULL;
+    }
+    else {
+        apr_uint32_t idx = count_to_index(eq, apr_atomic_inc32(&eq->readCount));
+        return index_to_bytes(eq, idx);
+    }
+}
+
+void *
+ap_equeue_writer_value(ap_equeue_t *eq)
+{
+    apr_uint32_t idx;
+
+    while (1) {
+        apr_uint32_t readCount = apr_atomic_read32(&eq->readCount);
+
+        if (count_to_index(eq, eq->writeCount + 1) != count_to_index(eq, readCount)) {
+            break;
+        }
+        /* TODO: research if sched_yield is even worth doing  */
+        sched_yield();
+    }
+
+    idx = count_to_index(eq, eq->writeCount);
+    return index_to_bytes(eq, idx);
+}
+
+
+void ap_equeue_writer_onward(ap_equeue_t *eq)
+{
+    apr_atomic_inc32(&eq->writeCount);
+}

Propchange: httpd/httpd/trunk/server/mpm/event/equeue.c
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpd/httpd/trunk/server/mpm/event/equeue.c
------------------------------------------------------------------------------
    svn:keywords = Date Revision Author HeadURL Id

Propchange: httpd/httpd/trunk/server/mpm/event/equeue.c
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: httpd/httpd/trunk/server/mpm/event/equeue.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/server/mpm/event/equeue.h?rev=1202257&view=auto
==============================================================================
--- httpd/httpd/trunk/server/mpm/event/equeue.h (added)
+++ httpd/httpd/trunk/server/mpm/event/equeue.h Tue Nov 15 15:51:03 2011
@@ -0,0 +1,50 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _event_mpm_equeue_h_
+#define _event_mpm_equeue_h_
+
+#include "httpd.h"
+
+typedef struct ap_equeue_t ap_equeue_t;
+
+apr_status_t
+ap_equeue_create(apr_pool_t *p,
+                 unsigned int nelem,
+                 apr_size_t elem_size,
+                 ap_equeue_t **eqout);
+
+
+/**
+ * Current value of the reader, returns NULL if the reader is caught up
+ * with the writer
+ */
+void* ap_equeue_reader_next(ap_equeue_t *eq);
+
+/**
+ * Returns pointer to next available write slot.  May block
+ * in a spin lock if none are available.
+ */
+void* ap_equeue_writer_value(ap_equeue_t *eq);
+
+/**
+ * Move the write position up one, making the previously 
+ * editted value available to the reader.
+ */
+void ap_equeue_writer_onward(ap_equeue_t *eq);
+
+
+#endif

Propchange: httpd/httpd/trunk/server/mpm/event/equeue.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: httpd/httpd/trunk/server/mpm/event/equeue.h
------------------------------------------------------------------------------
    svn:keywords = Date Revision Author HeadURL Id

Propchange: httpd/httpd/trunk/server/mpm/event/equeue.h
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: httpd/httpd/trunk/server/mpm/event/event.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/server/mpm/event/event.c?rev=1202257&r1=1202256&r2=1202257&view=diff
==============================================================================
--- httpd/httpd/trunk/server/mpm/event/event.c (original)
+++ httpd/httpd/trunk/server/mpm/event/event.c Tue Nov 15 15:51:03 2011
@@ -97,6 +97,8 @@
 #include <limits.h>             /* for INT_MAX */
 
 
+#include "equeue.h"
+
 #if HAVE_SERF
 #include "mod_serf.h"
 #include "serf.h"
@@ -183,7 +185,19 @@ static fd_queue_t *worker_queue;
 static fd_queue_info_t *worker_queue_info;
 static int mpm_state = AP_MPMQ_STARTING;
 
-static apr_thread_mutex_t *timeout_mutex;
+typedef enum {
+    TIMEOUT_WRITE_COMPLETION,
+    TIMEOUT_KEEPALIVE,
+    TIMEOUT_LINGER,
+    TIMEOUT_SHORT_LINGER
+} timeout_type_e;
+
+typedef struct pollset_op_t {
+    timeout_type_e timeout_type;
+    conn_state_t *cs;
+    const char *tag;
+} pollset_op_t;
+
 APR_RING_HEAD(timeout_head_t, conn_state_t);
 struct timeout_queue {
     struct timeout_head_t head;
@@ -356,6 +370,7 @@ static apr_os_thread_t *listener_os_thre
  * perform a non-graceful (forced) shutdown of the server.
  */
 static apr_socket_t **worker_sockets;
+static ap_equeue_t **worker_equeues;
 
 static void disable_listensocks(int process_slot)
 {
@@ -737,15 +752,45 @@ static void set_signals(void)
 #endif
 }
 
+static void process_pollop(pollset_op_t *op)
+{
+    apr_status_t rv;
+    conn_state_t *cs = op->cs;
+
+    switch (op->timeout_type) {
+    case TIMEOUT_WRITE_COMPLETION:
+        TO_QUEUE_APPEND(write_completion_q, cs);
+        break;
+    case TIMEOUT_KEEPALIVE:
+        TO_QUEUE_APPEND(keepalive_q, cs);
+        break;
+    case TIMEOUT_LINGER:
+        TO_QUEUE_APPEND(linger_q, cs);
+        break;
+    case TIMEOUT_SHORT_LINGER:
+        TO_QUEUE_APPEND(short_linger_q, cs);
+        break;
+    }
+
+    rv = apr_pollset_add(event_pollset, &op->cs->pfd);
+
+    if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
+        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf,
+                     "%s: apr_pollset_add failure", op->tag);
+    }
+}
+
 /*
  * close our side of the connection
  * Pre-condition: cs is not in any timeout queue and not in the pollset,
  *                timeout_mutex is not locked
  * return: 0 if connection is fully closed,
  *         1 if connection is lingering
- * may be called by listener or by worker thread
+ * may be called by listener or by worker thread.
+ * the eq may be null if called from the listener thread,
+ * and the pollset operations are done directly by this function.
  */
-static int start_lingering_close(conn_state_t *cs)
+static int start_lingering_close(conn_state_t *cs, ap_equeue_t *eq)
 {
     apr_status_t rv;
     if (ap_start_lingering_close(cs->c)) {
@@ -755,7 +800,15 @@ static int start_lingering_close(conn_st
     }
     else {
         apr_socket_t *csd = ap_get_conn_socket(cs->c);
-        struct timeout_queue *q;
+        pollset_op_t localv;
+        pollset_op_t *v;
+
+        if (eq) {
+            v = ap_equeue_writer_value(eq);
+        }
+        else {
+            v = &localv;
+        }
 
         rv = apr_socket_timeout_set(csd, 0);
         AP_DEBUG_ASSERT(rv == APR_SUCCESS);
@@ -767,30 +820,25 @@ static int start_lingering_close(conn_st
         if (apr_table_get(cs->c->notes, "short-lingering-close")) {
             cs->expiration_time =
                 apr_time_now() + apr_time_from_sec(SECONDS_TO_LINGER);
-            q = &short_linger_q;
+            v->timeout_type = TIMEOUT_SHORT_LINGER;
+            v->tag = "start_lingering_close(short)";
             cs->state = CONN_STATE_LINGER_SHORT;
         }
         else {
             cs->expiration_time =
                 apr_time_now() + apr_time_from_sec(MAX_SECS_TO_LINGER);
-            q = &linger_q;
+            v->timeout_type = TIMEOUT_LINGER;
+            v->tag = "start_lingering_close(normal)";
             cs->state = CONN_STATE_LINGER_NORMAL;
         }
-        apr_thread_mutex_lock(timeout_mutex);
-        TO_QUEUE_APPEND(*q, cs);
+
         cs->pfd.reqevents = APR_POLLIN | APR_POLLHUP | APR_POLLERR;
-        rv = apr_pollset_add(event_pollset, &cs->pfd);
-        apr_thread_mutex_unlock(timeout_mutex);
-        if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
-            ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf,
-                         "start_lingering_close: apr_pollset_add failure");
-            apr_thread_mutex_lock(timeout_mutex);
-            TO_QUEUE_REMOVE(*q, cs);
-            apr_thread_mutex_unlock(timeout_mutex);
-            apr_socket_close(cs->pfd.desc.s);
-            apr_pool_clear(cs->p);
-            ap_push_pool(worker_queue_info, cs->p);
-            return 0;
+        v->cs = cs;
+        if (eq != NULL) {
+            ap_equeue_writer_onward(eq);
+        }
+        else {
+            process_pollop(v);
         }
     }
     return 1;
@@ -802,7 +850,7 @@ static int start_lingering_close(conn_st
  * Pre-condition: cs is not in any timeout queue and not in the pollset
  * return: irrelevant (need same prototype as start_lingering_close)
  */
-static int stop_lingering_close(conn_state_t *cs)
+static int stop_lingering_close(conn_state_t *cs, ap_equeue_t *eq)
 {
     apr_status_t rv;
     apr_socket_t *csd = ap_get_conn_socket(cs->c);
@@ -824,7 +872,9 @@ static int stop_lingering_close(conn_sta
  *         0 if it is still open and waiting for some event
  */
 static int process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * sock,
-                          conn_state_t * cs, int my_child_num,
+                          conn_state_t * cs,
+                          ap_equeue_t *eq,
+                          int my_child_num,
                           int my_thread_num)
 {
     conn_rec *c;
@@ -858,7 +908,6 @@ static int process_socket(apr_thread_t *
         pt->type = PT_CSD;
         pt->baton = cs;
         cs->pfd.client_data = pt;
-        TO_QUEUE_ELEM_INIT(cs);
 
         ap_update_vhost_given_ip(c);
 
@@ -935,12 +984,16 @@ read_request:
              * Set a write timeout for this connection, and let the
              * event thread poll for writeability.
              */
+            pollset_op_t *v = ap_equeue_writer_value(eq);
+
             cs->expiration_time = ap_server_conf->timeout + apr_time_now();
-            apr_thread_mutex_lock(timeout_mutex);
-            TO_QUEUE_APPEND(write_completion_q, cs);
             cs->pfd.reqevents = APR_POLLOUT | APR_POLLHUP | APR_POLLERR;
-            rc = apr_pollset_add(event_pollset, &cs->pfd);
-            apr_thread_mutex_unlock(timeout_mutex);
+
+            v->cs = cs;
+            v->timeout_type = TIMEOUT_WRITE_COMPLETION;
+            v->tag = "process_socket(write_completion)";
+
+            ap_equeue_writer_onward(eq);
             return 1;
         }
         else if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted ||
@@ -957,11 +1010,12 @@ read_request:
     }
 
     if (cs->state == CONN_STATE_LINGER) {
-        if (!start_lingering_close(cs))
+        if (!start_lingering_close(cs, eq)) {
             return 0;
+        }
     }
     else if (cs->state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) {
-        apr_status_t rc;
+        pollset_op_t *v;
 
         /* It greatly simplifies the logic to use a single timeout value here
          * because the new element can just be added to the end of the list and
@@ -973,19 +1027,14 @@ read_request:
          */
         cs->expiration_time = ap_server_conf->keep_alive_timeout +
                               apr_time_now();
-        apr_thread_mutex_lock(timeout_mutex);
-        TO_QUEUE_APPEND(keepalive_q, cs);
 
         /* Add work to pollset. */
+        v = ap_equeue_writer_value(eq);
+        v->timeout_type = TIMEOUT_KEEPALIVE;
+        v->cs = cs;
         cs->pfd.reqevents = APR_POLLIN;
-        rc = apr_pollset_add(event_pollset, &cs->pfd);
-        apr_thread_mutex_unlock(timeout_mutex);
-
-        if (rc != APR_SUCCESS) {
-            ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
-                         "process_socket: apr_pollset_add failure");
-            AP_DEBUG_ASSERT(rc == APR_SUCCESS);
-        }
+        v->tag = "process_socket(keepalive)";
+        ap_equeue_writer_onward(eq);
     }
     return 1;
 }
@@ -1273,7 +1322,6 @@ static void process_lingering_close(conn
         return;
     }
 
-    apr_thread_mutex_lock(timeout_mutex);
     rv = apr_pollset_remove(event_pollset, pfd);
     AP_DEBUG_ASSERT(rv == APR_SUCCESS);
 
@@ -1281,7 +1329,6 @@ static void process_lingering_close(conn
     AP_DEBUG_ASSERT(rv == APR_SUCCESS);
 
     TO_QUEUE_REMOVE(*q, cs);
-    apr_thread_mutex_unlock(timeout_mutex);
     TO_QUEUE_ELEM_INIT(cs);
 
     apr_pool_clear(cs->p);
@@ -1294,7 +1341,7 @@ static void process_lingering_close(conn
  */
 static void process_timeout_queue(struct timeout_queue *q,
                                   apr_time_t timeout_time,
-                                  int (*func)(conn_state_t *))
+                                  int (*func)(conn_state_t *, ap_equeue_t *eq))
 {
     int count = 0;
     conn_state_t *first, *cs, *last;
@@ -1322,15 +1369,13 @@ static void process_timeout_queue(struct
     APR_RING_UNSPLICE(first, last, timeout_list);
     AP_DEBUG_ASSERT(q->count >= count);
     q->count -= count;
-    apr_thread_mutex_unlock(timeout_mutex);
     while (count) {
         cs = APR_RING_NEXT(first, timeout_list);
         TO_QUEUE_ELEM_INIT(first);
-        func(first);
+        func(first, NULL);
         first = cs;
         count--;
     }
-    apr_thread_mutex_lock(timeout_mutex);
 }
 
 static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
@@ -1397,14 +1442,12 @@ static void * APR_THREAD_FUNC listener_t
             /* trace log status every second */
             if (now - last_log > apr_time_from_msec(1000)) {
                 last_log = now;
-                apr_thread_mutex_lock(timeout_mutex);
                 ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf,
                              "connections: %d (write-completion: %d "
                              "keep-alive: %d lingering: %d)",
                              connection_count, write_completion_q.count,
                              keepalive_q.count,
                              linger_q.count + short_linger_q.count);
-                apr_thread_mutex_unlock(timeout_mutex);
             }
         }
 
@@ -1483,7 +1526,6 @@ static void * APR_THREAD_FUNC listener_t
                 case CONN_STATE_WRITE_COMPLETION:
                     get_worker(&have_idle_worker, blocking,
                                &workers_were_busy);
-                    apr_thread_mutex_lock(timeout_mutex);
                     TO_QUEUE_REMOVE(*remove_from_q, cs);
                     rc = apr_pollset_remove(event_pollset, &cs->pfd);
 
@@ -1496,19 +1538,17 @@ static void * APR_THREAD_FUNC listener_t
                     if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) {
                         ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
                                      "pollset remove failed");
-                        apr_thread_mutex_unlock(timeout_mutex);
-                        start_lingering_close(cs);
+                        start_lingering_close(cs, NULL);
                         break;
                     }
 
-                    apr_thread_mutex_unlock(timeout_mutex);
                     TO_QUEUE_ELEM_INIT(cs);
                     /* If we didn't get a worker immediately for a keep-alive
                      * request, we close the connection, so that the client can
                      * re-connect to a different process.
                      */
                     if (!have_idle_worker) {
-                        start_lingering_close(cs);
+                        start_lingering_close(cs, NULL);
                         break;
                     }
                     rc = push2worker(out_pfd, event_pollset);
@@ -1632,6 +1672,20 @@ static void * APR_THREAD_FUNC listener_t
             num--;
         }                   /* while for processing poll */
 
+        {
+            /* TODO: break out to separate function */
+            int i;
+
+            for (i = 0; i < threads_per_child; i++) {
+                ap_equeue_t *eq = worker_equeues[i];
+                pollset_op_t *op = NULL;
+
+                while ((op = ap_equeue_reader_next(eq)) != NULL) {
+                    process_pollop(op);
+                }
+            }
+        }
+
         /* XXX possible optimization: stash the current time for use as
          * r->request_time for new requests
          */
@@ -1642,7 +1696,6 @@ static void * APR_THREAD_FUNC listener_t
             timeout_time = now + TIMEOUT_FUDGE_FACTOR;
 
             /* handle timed out sockets */
-            apr_thread_mutex_lock(timeout_mutex);
 
             /* Step 1: keepalive timeouts */
             /* If all workers are busy, we kill older keep-alive connections so that they
@@ -1672,7 +1725,6 @@ static void * APR_THREAD_FUNC listener_t
             ps->write_completion = write_completion_q.count;
             ps->lingering_close = linger_q.count + short_linger_q.count;
             ps->keep_alive = keepalive_q.count;
-            apr_thread_mutex_unlock(timeout_mutex);
 
             ps->connections = apr_atomic_read32(&connection_count);
             /* XXX: should count CONN_STATE_SUSPENDED and set ps->suspended */
@@ -1716,6 +1768,7 @@ static void *APR_THREAD_FUNC worker_thre
     apr_status_t rv;
     int is_idle = 0;
     timer_event_t *te = NULL;
+    ap_equeue_t *eq = worker_equeues[thread_slot];
 
     free(ti);
 
@@ -1788,7 +1841,7 @@ static void *APR_THREAD_FUNC worker_thre
         else {
             is_idle = 0;
             worker_sockets[thread_slot] = csd;
-            rv = process_socket(thd, ptrans, csd, cs, process_slot, thread_slot);
+            rv = process_socket(thd, ptrans, csd, cs, eq, process_slot, thread_slot);
             if (!rv) {
                 requests_this_child--;
             }
@@ -1885,17 +1938,6 @@ static void *APR_THREAD_FUNC start_threa
         clean_child_exit(APEXIT_CHILDFATAL);
     }
 
-    /* Create the timeout mutex and main pollset before the listener
-     * thread starts.
-     */
-    rv = apr_thread_mutex_create(&timeout_mutex, APR_THREAD_MUTEX_DEFAULT,
-                                 pchild);
-    if (rv != APR_SUCCESS) {
-        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf,
-                     "creation of the timeout mutex failed.");
-        clean_child_exit(APEXIT_CHILDFATAL);
-    }
-
     /* Create the main pollset */
     rv = apr_pollset_create(&event_pollset,
                             threads_per_child, /* XXX don't we need more, to handle
@@ -1912,6 +1954,16 @@ static void *APR_THREAD_FUNC start_threa
     worker_sockets = apr_pcalloc(pchild, threads_per_child
                                  * sizeof(apr_socket_t *));
 
+    worker_equeues = apr_palloc(pchild, threads_per_child * sizeof(ap_equeue_t*));
+
+    for (i = 0; i < threads_per_child; i++) {
+        ap_equeue_t* eq = NULL;
+        /* TODO: research/test optimal size of queue here */
+        ap_equeue_create(pchild, 16, sizeof(pollset_op_t), &eq);
+        /* same as thread ID */
+        worker_equeues[i] = eq;
+    }
+
     loops = prev_threads_created = 0;
     while (1) {
         /* threads_per_child does not include the listener thread */



Mime
View raw message