httpd-cvs mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bri...@apache.org
Subject svn commit: r280222 - in /httpd/httpd/branches/async-dev/server/mpm/experimental/leader: Makefile.in README io_multiplexer.c io_multiplexer.h leader.c
Date Mon, 12 Sep 2005 00:35:26 GMT
Author: brianp
Date: Sun Sep 11 17:35:23 2005
New Revision: 280222

URL: http://svn.apache.org/viewcvs?rev=280222&view=rev
Log:
Added asynchronous keep-alive support to the leader MPM...
the new io_multiplexer object is intended to provide a
foundation for async processing of the other connection
states.

Added:
    httpd/httpd/branches/async-dev/server/mpm/experimental/leader/io_multiplexer.c
    httpd/httpd/branches/async-dev/server/mpm/experimental/leader/io_multiplexer.h
Modified:
    httpd/httpd/branches/async-dev/server/mpm/experimental/leader/Makefile.in
    httpd/httpd/branches/async-dev/server/mpm/experimental/leader/README
    httpd/httpd/branches/async-dev/server/mpm/experimental/leader/leader.c

Modified: httpd/httpd/branches/async-dev/server/mpm/experimental/leader/Makefile.in
URL: http://svn.apache.org/viewcvs/httpd/httpd/branches/async-dev/server/mpm/experimental/leader/Makefile.in?rev=280222&r1=280221&r2=280222&view=diff
==============================================================================
--- httpd/httpd/branches/async-dev/server/mpm/experimental/leader/Makefile.in (original)
+++ httpd/httpd/branches/async-dev/server/mpm/experimental/leader/Makefile.in Sun Sep 11 17:35:23
2005
@@ -1,5 +1,5 @@
 
 LTLIBRARY_NAME    = libleader.la
-LTLIBRARY_SOURCES = leader.c
+LTLIBRARY_SOURCES = leader.c io_multiplexer.c
 
 include $(top_srcdir)/build/ltlib.mk

Modified: httpd/httpd/branches/async-dev/server/mpm/experimental/leader/README
URL: http://svn.apache.org/viewcvs/httpd/httpd/branches/async-dev/server/mpm/experimental/leader/README?rev=280222&r1=280221&r2=280222&view=diff
==============================================================================
--- httpd/httpd/branches/async-dev/server/mpm/experimental/leader/README (original)
+++ httpd/httpd/branches/async-dev/server/mpm/experimental/leader/README Sun Sep 11 17:35:23
2005
@@ -1,15 +1,24 @@
 Leader MPM:
-This is an experimental variant of the standard worker MPM.
-It uses a Leader/Followers design pattern to coordinate work among threads:
+
+This is an experimental MPM that uses the Leader/Followers design
+pattern to coordinate work among threads:
 http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf
 
+As of httpd-2.3, the Leader MPM also incorporates a variant of the
+Event MPM's asynchronous socket I/O for keepalive connections.  The
+management of pollsets and timeouts is encapsulated within the
+io_multiplexer functions, with the aim of eventually supporting
+asynchronous write completion.
+
 To use the leader MPM, add "--with-mpm=leader" to the configure
 script's arguments when building the httpd.
   
-This MPM depends on APR's atomic compare-and-swap operations for
-thread synchronization.  If you are compiling for an x86 target
-and you don't need to support 386s, or you're compiling for a
-SPARC and you don't need to run on pre-UltraSPARC chips, add
-"--enable-nonportable-atomics=yes" to the configure script's
-arguments.  This will cause APR to implement atomic operations
-using efficient opcodes not available in older CPUs.
+IMPORTANT NOTES:
+
+* At the moment, with the async code under active development, the
+  Leader MPM is suitable for R&D use, but not for production use.
+
+* Like the Event MPM, the Leader MPM now requires a threadsafe
+  apr_pollset implementation, such as epoll on Linux or kqueue
+  on BSD.
+

Added: httpd/httpd/branches/async-dev/server/mpm/experimental/leader/io_multiplexer.c
URL: http://svn.apache.org/viewcvs/httpd/httpd/branches/async-dev/server/mpm/experimental/leader/io_multiplexer.c?rev=280222&view=auto
==============================================================================
--- httpd/httpd/branches/async-dev/server/mpm/experimental/leader/io_multiplexer.c (added)
+++ httpd/httpd/branches/async-dev/server/mpm/experimental/leader/io_multiplexer.c Sun Sep
11 17:35:23 2005
@@ -0,0 +1,254 @@
+/* Copyright 2005 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "apr_poll.h"
+#include "apr_ring.h"
+#include "apr_thread_cond.h"
+#include "apr_thread_mutex.h"
+
+#include "io_multiplexer.h"
+
+extern server_rec *ap_server_conf;
+
+APR_RING_HEAD(timeout_ring_header_t, conn_state_t);
+
+struct io_multiplexer {
+    int stopped;
+    apr_thread_mutex_t *lock;
+    apr_thread_mutex_t *pollset_lock;
+    apr_pollset_t *pollset;
+    apr_int32_t num_pending_events;
+    const apr_pollfd_t *next_pending_event;
+    struct timeout_ring_header_t pending_timeouts;
+    struct timeout_ring_header_t expired_timeouts;
+    volatile int poll_sequence_num;
+};
+
+static apr_status_t io_multiplexer_remove_internal(io_multiplexer *iom,
+                                                   multiplexable *m);
+
+apr_status_t io_multiplexer_create(io_multiplexer **iom, apr_pool_t *p,
+                                   apr_uint32_t max_descriptors)
+{
+    apr_status_t rv;
+    *iom = (io_multiplexer *)apr_palloc(p, sizeof(**iom));
+    rv = apr_thread_mutex_create(&((*iom)->lock), APR_THREAD_MUTEX_DEFAULT, p);
+    if (rv != APR_SUCCESS) {
+        return rv;
+    }
+    rv = apr_thread_mutex_create(&((*iom)->pollset_lock),
+                                 APR_THREAD_MUTEX_DEFAULT, p);
+    if (rv != APR_SUCCESS) {
+        return rv;
+    }
+    rv = apr_pollset_create(&((*iom)->pollset), max_descriptors, p,
+                            APR_POLLSET_THREADSAFE);
+    if (rv != APR_SUCCESS) {
+        return rv;
+    }
+    (*iom)->stopped = 0;
+    (*iom)->num_pending_events = 0;
+    (*iom)->next_pending_event = NULL;
+    APR_RING_INIT(&((*iom)->pending_timeouts), conn_state_t, timeout_list);
+    APR_RING_INIT(&((*iom)->expired_timeouts), conn_state_t, timeout_list);
+    (*iom)->poll_sequence_num = 0;
+    
+    return APR_SUCCESS;
+}
+
+#define DEFAULT_POLL_TIMEOUT 1000000
+
+apr_status_t io_multiplexer_get_event(io_multiplexer *iom,
+                                      apr_pollfd_t *event)
+{
+    apr_status_t rv;
+    rv = apr_thread_mutex_lock(iom->pollset_lock);
+    if (rv != APR_SUCCESS) {
+        return rv;
+    }
+    rv = apr_thread_mutex_lock(iom->lock);
+    if (rv != APR_SUCCESS) {
+        apr_thread_mutex_unlock(iom->pollset_lock);
+        return rv;
+    }
+    if (iom->stopped) {
+        apr_thread_mutex_unlock(iom->lock);
+        apr_thread_mutex_unlock(iom->pollset_lock);
+        return APR_EINVAL;
+    }
+    for (;;) {
+        /* Invariant: at the start of each iteration of this loop, the
+         * active thread holds iom->lock.
+         */
+         if (!APR_RING_EMPTY(&(iom->expired_timeouts), conn_state_t, timeout_list))
{
+             /* There are some timeout notifications remaining
+              * from the last poll.  Return the next one.
+              */
+             conn_state_t *cs = APR_RING_FIRST(&(iom->expired_timeouts));
+             APR_RING_REMOVE(cs, timeout_list);
+             *event = cs->pfd;
+             event->rtnevents |= IOM_POLL_TIMEOUT;
+             apr_thread_mutex_unlock(iom->lock);
+             return apr_thread_mutex_unlock(iom->pollset_lock);
+         }
+         else if (iom->num_pending_events > 0) {
+            /* There are some events remaining from the last
+             * poll.  Return the next one.
+             */
+            *event = *(iom->next_pending_event++);
+            apr_pollset_remove(iom->pollset, event);
+            iom->num_pending_events--;
+            apr_thread_mutex_unlock(iom->lock);
+            return apr_thread_mutex_unlock(iom->pollset_lock);
+        }
+        else {
+            /* No unprocessed events remain from the previous poll,
+             * so initiate a new poll.
+             */
+            apr_int32_t num_pending_events = 0;
+            const apr_pollfd_t *next_pending_event;
+            apr_interval_time_t poll_timeout;
+            int i;
+            
+            if (APR_RING_EMPTY(&(iom->pending_timeouts), conn_state_t,
+                               timeout_list)) {
+                poll_timeout = DEFAULT_POLL_TIMEOUT;
+            }
+            else {
+                /* If there are pending timeouts, check whether
+                 * any of them have expired.  If none have expired,
+                 * use the expiration time on the first one to
+                 * determine how long the poll should block.
+                 */
+                apr_time_t now = apr_time_now();
+                conn_state_t *cs = APR_RING_FIRST(&(iom->pending_timeouts));
+                if (cs->expiration_time <= now) {
+                    do {
+                        APR_RING_REMOVE(cs, timeout_list);
+                        apr_pollset_remove(iom->pollset, &(cs->pfd));
+                        APR_RING_INSERT_TAIL(&(iom->expired_timeouts), cs,
+                                             conn_state_t, timeout_list);
+                        if (APR_RING_EMPTY(&(iom->pending_timeouts),
+                                           conn_state_t, timeout_list)) {
+                            break;
+                        }
+                        cs = APR_RING_FIRST(&(iom->pending_timeouts));
+                    } while (cs->expiration_time <= now);
+                    continue;
+                }
+                else {
+                    poll_timeout = cs->expiration_time - now;
+                }
+            }
+
+            apr_thread_mutex_unlock(iom->lock);
+
+            rv = apr_pollset_poll(iom->pollset, poll_timeout,
+                                  &num_pending_events, &next_pending_event);
+
+            if ((rv != APR_SUCCESS) && !APR_STATUS_IS_TIMEUP(rv) && !APR_STATUS_IS_EINTR(rv))
{
+                apr_thread_mutex_unlock(iom->pollset_lock);
+                return rv;
+            }
+            apr_thread_mutex_lock(iom->lock);
+            
+            if (num_pending_events > 0) {
+                iom->num_pending_events = num_pending_events;
+                iom->next_pending_event = next_pending_event;
+                for (i = 0; i < num_pending_events; i++) {
+                    multiplexable *m = (multiplexable *)next_pending_event[i].client_data;
+                    if (m != NULL) {
+                        io_multiplexer_remove_internal(iom, m);
+                    }
+                }
+            }
+        }
+    }
+}
+
+apr_status_t io_multiplexer_stop(io_multiplexer *iom, int graceful) {
+    iom->stopped = 1;
+    return APR_SUCCESS;
+}
+
+apr_status_t io_multiplexer_add(io_multiplexer *iom, multiplexable *m,
+                                long timeout_in_usec)
+{
+    apr_status_t rv;
+    apr_thread_mutex_lock(iom->lock);
+    if (iom->stopped) {
+        rv = APR_EINVAL;
+    }
+    else if (m->type == IOM_CONNECTION) {
+        APR_RING_REMOVE(m->c->cs, timeout_list);
+        m->c->cs->pfd.client_data = m;
+        rv = apr_pollset_add(iom->pollset, &(m->c->cs->pfd));
+        if (timeout_in_usec >= 0) {
+            /* XXX: Keep the pending_timeouts list sorted */
+            m->c->cs->expiration_time = apr_time_now() + timeout_in_usec;
+            APR_RING_INSERT_TAIL(&(iom->pending_timeouts), m->c->cs,
+                                 conn_state_t, timeout_list);
+        }
+    }
+    else if (m->type == IOM_LISTENER) {
+        apr_pollfd_t desc;
+        desc.desc_type = APR_POLL_SOCKET;
+        desc.desc.s = m->l->sd;
+        desc.reqevents = APR_POLLIN;
+        desc.client_data = m;
+        rv = apr_pollset_add(iom->pollset, &desc);
+    }
+    else {
+        rv = APR_EINVALSOCK;
+    }
+    apr_thread_mutex_unlock(iom->lock);
+    return rv;
+}
+
+apr_status_t io_multiplexer_remove(io_multiplexer *iom, multiplexable *m)
+{
+    apr_status_t rv;
+    apr_thread_mutex_lock(iom->lock);
+    rv = io_multiplexer_remove_internal(iom, m);
+    apr_thread_mutex_unlock(iom->lock);
+    return rv;
+}
+
+static apr_status_t io_multiplexer_remove_internal(io_multiplexer *iom,
+                                                   multiplexable *m)
+{
+    apr_status_t rv;
+    if (iom->stopped) {
+        rv = APR_EINVAL;
+    }
+    else if (m->type == IOM_CONNECTION) {
+        APR_RING_REMOVE(m->c->cs, timeout_list);
+        rv = apr_pollset_remove(iom->pollset, &(m->c->cs->pfd));
+    }
+    else if (m->type == IOM_LISTENER) {
+        apr_pollfd_t desc;
+        desc.desc_type = APR_POLL_SOCKET;
+        desc.desc.s = m->l->sd;
+        desc.reqevents = APR_POLLIN;
+        desc.client_data = NULL;
+        rv = apr_pollset_remove(iom->pollset, &desc);
+    }
+    else {
+        rv = APR_EINVALSOCK;
+    }
+    return rv;
+}
+

Added: httpd/httpd/branches/async-dev/server/mpm/experimental/leader/io_multiplexer.h
URL: http://svn.apache.org/viewcvs/httpd/httpd/branches/async-dev/server/mpm/experimental/leader/io_multiplexer.h?rev=280222&view=auto
==============================================================================
--- httpd/httpd/branches/async-dev/server/mpm/experimental/leader/io_multiplexer.h (added)
+++ httpd/httpd/branches/async-dev/server/mpm/experimental/leader/io_multiplexer.h Sun Sep
11 17:35:23 2005
@@ -0,0 +1,58 @@
+/* Copyright 2005 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef APACHE_MPM_EVENT_IOMUX_H
+#define APACHE_MPM_EVENT_IOMUX_H
+
+#include "apr_pools.h"
+#include "apr_network_io.h"
+#include "apr_poll.h"
+
+#include "ap_listen.h"
+#include "httpd.h"
+
+typedef struct io_multiplexer io_multiplexer;
+
+typedef struct {
+    enum { IOM_LISTENER, IOM_CONNECTION } type;
+    union {
+        ap_listen_rec *l;
+        conn_rec *c;
+    };
+} multiplexable;
+
+/* Flag to set in apr_pollfd_t.rtnevents upon timeout
+ * XXX: Find a way to make sure this never collides with any value
+ *      set by APR
+ */
+#define IOM_POLL_TIMEOUT 0x8000
+
+apr_status_t io_multiplexer_create(io_multiplexer **iom, apr_pool_t *p,
+                                   apr_uint32_t max_descriptors);
+
+apr_status_t io_multiplexer_get_event(io_multiplexer *iom, apr_pollfd_t *event);
+
+apr_status_t io_multiplexer_stop(io_multiplexer *iom, int graceful);
+
+#define IOM_TIMEOUT_INFINITE -1
+
+apr_status_t io_multiplexer_add(io_multiplexer *iom, multiplexable *m,
+                                long timeout_in_usec);
+
+apr_status_t io_multiplexer_remove(io_multiplexer *iom, multiplexable *m);
+
+#endif /* APACHE_MPM_EVENT_IOMUX_H */
+

Modified: httpd/httpd/branches/async-dev/server/mpm/experimental/leader/leader.c
URL: http://svn.apache.org/viewcvs/httpd/httpd/branches/async-dev/server/mpm/experimental/leader/leader.c?rev=280222&r1=280221&r2=280222&view=diff
==============================================================================
--- httpd/httpd/branches/async-dev/server/mpm/experimental/leader/leader.c (original)
+++ httpd/httpd/branches/async-dev/server/mpm/experimental/leader/leader.c Sun Sep 11 17:35:23
2005
@@ -21,8 +21,6 @@
 #include "apr_thread_proc.h"
 #include "apr_signal.h"
 #include "apr_thread_cond.h"
-#include "apr_thread_mutex.h"
-#include "apr_proc_mutex.h"
 #define APR_WANT_STRFUNC
 #include "apr_want.h"
 
@@ -52,6 +50,7 @@
 #include "http_config.h"        /* for read_config */ 
 #include "http_core.h"          /* for get_remote_host */ 
 #include "http_connection.h"
+#include "http_vhost.h"
 #include "ap_mpm.h"
 #include "mpm_common.h"
 #include "ap_listen.h"
@@ -64,6 +63,8 @@
 
 #include "apr_atomic.h"
 
+#include "io_multiplexer.h"
+
 /* Limit on the total --- clients will be locked out if more servers than
  * this are needed.  It is intended solely to keep the server from crashing
  * when things get out of hand.
@@ -187,165 +188,15 @@
                            thread. Use this instead */
 static pid_t parent_pid;
 
-/* Locks for accept serialization */
-static apr_proc_mutex_t *accept_mutex;
-
-#ifdef SINGLE_LISTEN_UNSERIALIZED_ACCEPT
-#define SAFE_ACCEPT(stmt) (ap_listeners->next ? (stmt) : APR_SUCCESS)
-#else
-#define SAFE_ACCEPT(stmt) (stmt)
-#endif
-
-
-/* Structure used to wake up an idle worker thread
+/* Max # of file descriptors that the io_multiplexer supports
+ * XXX should be configurable at runtime
  */
-struct worker_wakeup_info {
-    apr_uint32_t next; /* index into worker_wakeups array,
-                        * used to build a linked list
-                        */
-    apr_thread_cond_t *cond;
-    apr_thread_mutex_t *mutex;
-};
-
-static worker_wakeup_info *worker_wakeup_create(apr_pool_t *pool)
-{
-    apr_status_t rv;
-    worker_wakeup_info *wakeup;
-
-    wakeup = (worker_wakeup_info *)apr_palloc(pool, sizeof(*wakeup));
-    if ((rv = apr_thread_cond_create(&wakeup->cond, pool)) != APR_SUCCESS) {
-        return NULL;
-    }
-    if ((rv = apr_thread_mutex_create(&wakeup->mutex, APR_THREAD_MUTEX_DEFAULT,
-                                      pool)) != APR_SUCCESS) {
-        return NULL;
-    }
-    /* The wakeup's mutex will be unlocked automatically when
-     * the worker blocks on the condition variable
-     */
-    apr_thread_mutex_lock(wakeup->mutex);
-    return wakeup;
-}
-
+#define MAX_IOM_DESCRIPTORS 1024
 
-/* Structure used to hold a stack of idle worker threads 
+/* Socket multiplexer used to watch large numbers of connections
+ * for readability/writeability
  */
-typedef struct {
-    /* 'state' consists of several fields concatenated into a
-     * single 32-bit int for use with the apr_atomic_cas32() API:
-     *   state & STACK_FIRST  is the thread ID of the first thread
-     *                        in a linked list of idle threads
-     *   state & STACK_TERMINATED  indicates whether the proc is shutting down
-     *   state & STACK_NO_LISTENER indicates whether the process has
-     *                             no current listener thread
-     */
-    apr_uint32_t state;
-} worker_stack;
-
-#define STACK_FIRST  0xffff
-#define STACK_LIST_END  0xffff
-#define STACK_TERMINATED 0x10000
-#define STACK_NO_LISTENER 0x20000
-
-static worker_wakeup_info **worker_wakeups = NULL;
-
-static worker_stack* worker_stack_create(apr_pool_t *pool, apr_size_t max)
-{
-    worker_stack *stack = (worker_stack *)apr_palloc(pool, sizeof(*stack));
-    stack->state = STACK_NO_LISTENER | STACK_LIST_END;
-    return stack;
-}
-
-static apr_status_t worker_stack_wait(worker_stack *stack,
-                                      apr_uint32_t worker_id)
-{
-    worker_wakeup_info *wakeup = worker_wakeups[worker_id];
-
-    while (1) {
-        apr_uint32_t state = stack->state;
-        if (state & (STACK_TERMINATED | STACK_NO_LISTENER)) {
-            if (state & STACK_TERMINATED) {
-                return APR_EINVAL;
-            }
-            if (apr_atomic_cas32(&(stack->state), STACK_LIST_END, state) !=
-                state) {
-                continue;
-            }
-            else {
-                return APR_SUCCESS;
-            }
-        }
-        wakeup->next = state;
-        if (apr_atomic_cas32(&(stack->state), worker_id, state) != state) {
-            continue;
-        }
-        else {
-            return apr_thread_cond_wait(wakeup->cond, wakeup->mutex);
-        }
-    }    
-}
-
-static apr_status_t worker_stack_awaken_next(worker_stack *stack)
-{
-
-    while (1) {
-        apr_uint32_t state = stack->state;
-        apr_uint32_t first = state & STACK_FIRST;
-        if (first == STACK_LIST_END) {
-            if (apr_atomic_cas32(&(stack->state), state | STACK_NO_LISTENER,
-                                 state) != state) {
-                continue;
-            }
-            else {
-                return APR_SUCCESS;
-            }
-        }
-        else {
-            worker_wakeup_info *wakeup = worker_wakeups[first];
-            if (apr_atomic_cas32(&(stack->state), (state ^ first) | wakeup->next,
-                                 state) != state) {
-                continue;
-            }
-            else {
-                /* Acquire and release the idle worker's mutex to ensure
-                 * that it's actually waiting on its condition variable
-                 */
-                apr_status_t rv;
-                if ((rv = apr_thread_mutex_lock(wakeup->mutex)) !=
-                    APR_SUCCESS) {
-                    return rv;
-                }
-                if ((rv = apr_thread_mutex_unlock(wakeup->mutex)) !=
-                    APR_SUCCESS) {
-                    return rv;
-                }
-                return apr_thread_cond_signal(wakeup->cond);
-            }
-        }
-    }
-}
-
-static apr_status_t worker_stack_term(worker_stack *stack)
-{
-    int i;
-    apr_status_t rv;
-
-    while (1) {
-        apr_uint32_t state = stack->state;
-        if (apr_atomic_cas32(&(stack->state), state | STACK_TERMINATED,
-                             state) == state) {
-            break;
-        }
-    }
-    for (i = 0; i < ap_threads_per_child; i++) {
-        if ((rv = worker_stack_awaken_next(stack)) != APR_SUCCESS) {
-            return rv;
-        }
-    }
-    return APR_SUCCESS;
-}
-
-static worker_stack *idle_worker_stack;
+static io_multiplexer *iom;
 
 #define ST_INIT              0
 #define ST_GRACEFUL          1
@@ -362,7 +213,7 @@
     mpm_state = AP_MPMQ_STOPPING;
     workers_may_exit = 1;
 
-    worker_stack_term(idle_worker_stack);
+    io_multiplexer_stop(iom, 0);
 }
 
 AP_DECLARE(apr_status_t) ap_mpm_query(int query_code, int *result)
@@ -377,6 +228,9 @@
         case AP_MPMQ_IS_FORKED:
             *result = AP_MPMQ_DYNAMIC;
             return APR_SUCCESS;
+        case AP_MPMQ_IS_ASYNC:
+            *result = 1;
+            return APR_SUCCESS;
         case AP_MPMQ_HARD_LIMIT_DAEMONS:
             *result = server_limit;
             return APR_SUCCESS;
@@ -587,27 +441,51 @@
     return workers_may_exit;
 }
 
-/*****************************************************************
- * Child process main loop.
- */
-
-static void process_socket(apr_pool_t *p, apr_socket_t *sock, int my_child_num,
-                           int my_thread_num, apr_bucket_alloc_t *bucket_alloc)
-{
-    conn_rec *current_conn;
-    long conn_id = ID_FROM_CHILD_THREAD(my_child_num, my_thread_num);
-    int csd;
-    ap_sb_handle_t *sbh;
-
-    ap_create_sb_handle(&sbh, p, my_child_num, my_thread_num);
-    apr_os_sock_get(&csd, sock);
-
-    current_conn = ap_run_create_connection(p, ap_server_conf, sock,
-                                            conn_id, sbh, bucket_alloc);
-    if (current_conn) {
-        ap_process_connection(current_conn, sock);
-        ap_lingering_close(current_conn);
+static apr_status_t process_event_on_connection(const apr_pollfd_t *event,
+                                                io_multiplexer *iom,
+                                                int child_num, int thread_num)
+{
+    multiplexable *m = (multiplexable *)(event->client_data);
+    conn_rec *c = m->c;
+    conn_state_t *cs = c->cs;
+    
+    if (cs->state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) {
+        if (event->rtnevents & IOM_POLL_TIMEOUT) {
+            cs->state = CONN_STATE_LINGER;
+            ap_log_error(APLOG_MARK, APLOG_CRIT, 0, ap_server_conf, "timeout on connection");
+        }
+        else if (event->rtnevents & (APR_POLLIN | APR_POLLPRI)) {
+            cs->state = CONN_STATE_READ_REQUEST_LINE;
+        }
+        else if (event->rtnevents & APR_POLLHUP) {
+            /* close... */
+        }
+        else if (event->rtnevents & (APR_POLLERR | APR_POLLNVAL)) {
+            /* error... */
+        }
+    }
+    
+    if (cs->state == CONN_STATE_READ_REQUEST_LINE) {
+        if (!c->aborted) {
+            ap_run_process_connection(c);
+        }
+        else {
+            cs->state = CONN_STATE_LINGER;
+        }
+    }
+    
+    if (cs->state == CONN_STATE_LINGER) {
+        ap_lingering_close(c);
+        apr_bucket_alloc_destroy(cs->bucket_alloc);
+        apr_pool_destroy(cs->p);
+        return APR_SUCCESS;
     }
+    
+    if (cs->state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) {
+        io_multiplexer_add(iom, m, ap_server_conf->keep_alive_timeout);
+    }
+    
+    return APR_SUCCESS;
 }
 
 /* requests_this_child has gone to zero or below.  See if the admin coded
@@ -647,23 +525,100 @@
 #endif
 }
 
+static apr_status_t accept_new_connection(const apr_pollfd_t *event,
+                                          io_multiplexer *iom,
+                                          int child_num, int thread_num)
+{
+    apr_allocator_t *allocator = NULL;
+    apr_pool_t *p = NULL;
+    apr_bucket_alloc_t *bucket_alloc = NULL;
+    apr_socket_t *new_socket = NULL;
+    void *csd;
+    apr_status_t rv;
+    multiplexable *client_data = (multiplexable *)(event->client_data);
+    ap_listen_rec *listener = client_data->l;
+
+    apr_allocator_create(&allocator);
+    apr_allocator_max_free_set(allocator, ap_max_mem_free);
+    apr_pool_create_ex(&p, NULL, NULL, allocator);
+    apr_allocator_owner_set(allocator, p);
+    bucket_alloc = apr_bucket_alloc_create_ex(allocator);
+
+    rv = listener->accept_func(&csd, listener, p);
+
+   
+    if (rv == APR_SUCCESS) {
+        /* Re-register the listener with the multiplexer */
+        io_multiplexer_add(iom, client_data, -1);
+        
+        if (csd == NULL) {
+            rv = APR_EGENERAL;
+        }
+        else {
+            new_socket = (apr_socket_t *)csd;
+            conn_rec *connection;
+            long conn_id = ID_FROM_CHILD_THREAD(child_num, thread_num);
+            ap_sb_handle_t *sbh;
+            ap_create_sb_handle(&sbh, p, child_num, thread_num);
+            connection = ap_run_create_connection(p, ap_server_conf,
+                                                  new_socket, conn_id,
+                                                  sbh, bucket_alloc);
+            if (connection == NULL) {
+                rv = APR_EGENERAL;
+            }
+            else {
+                conn_state_t *cs = connection->cs;
+                multiplexable *m;
+                int rc;
+                
+                ap_update_vhost_given_ip(connection);
+                rc = ap_run_pre_connection(connection, csd);
+                if (rc != OK && rc != DONE) {
+                    ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
+                    "accept_new_connection: connection aborted");
+                    connection->aborted = 1;
+                }
+                m = (multiplexable *)apr_palloc(p, sizeof(*m));
+                m->type = IOM_CONNECTION;
+                m->c = connection;
+                cs->pfd.desc_type = APR_POLL_SOCKET;
+                cs->pfd.desc.s = new_socket;
+                cs->pfd.reqevents = APR_POLLIN;
+                cs->pfd.client_data = m;
+
+                rv = io_multiplexer_add(iom, m, -1);
+                if (rv != APR_SUCCESS) {
+                    ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf,
+                         "io_multiplexer_add() failed");
+                }
+             }
+        }
+    }
+    if (rv != APR_SUCCESS) {
+        if (new_socket != NULL) {
+            apr_socket_close(new_socket);
+        }
+        if (bucket_alloc != NULL) {
+            apr_bucket_alloc_destroy(bucket_alloc); /* XXX unneeded? */
+        }
+        if (p != NULL) {
+            apr_pool_destroy(p);
+        }
+        ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, ap_server_conf, "not restoring listener");
+    }
+
+    return rv;
+}
+
 static void *worker_thread(apr_thread_t *thd, void * dummy)
 {
     proc_info * ti = dummy;
     int process_slot = ti->pid;
     int thread_slot = ti->tid;
-    apr_uint32_t my_worker_num = (apr_uint32_t)(ti->tid);
-    apr_pool_t *tpool = apr_thread_pool_get(thd);
-    void *csd = NULL;
     apr_allocator_t *allocator;
     apr_pool_t *ptrans;                /* Pool for per-transaction stuff */
     apr_bucket_alloc_t *bucket_alloc;
-    int numdesc;
-    apr_pollset_t *pollset;
     apr_status_t rv;
-    ap_listen_rec *lr;
-    int is_listener;
-    int last_poll_idx = 0;
 
     ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_STARTING, NULL);
 
@@ -676,172 +631,45 @@
     apr_allocator_owner_set(allocator, ptrans);
     bucket_alloc = apr_bucket_alloc_create_ex(allocator);
 
-    apr_pollset_create(&pollset, num_listensocks, tpool, 0);
-    for (lr = ap_listeners ; lr != NULL ; lr = lr->next) {
-        apr_pollfd_t pfd = { 0 };
-
-        pfd.desc_type = APR_POLL_SOCKET;
-        pfd.desc.s = lr->sd;
-        pfd.reqevents = APR_POLLIN;
-        pfd.client_data = lr;
-
-        /* ### check the status */
-        (void) apr_pollset_add(pollset, &pfd);
-    }
-    
-    /* TODO: Switch to a system where threads reuse the results from earlier
-       poll calls - manoj */
-    is_listener = 0;
     while (!workers_may_exit) {
-
+        apr_pollfd_t event;
         ap_update_child_status_from_indexes(process_slot, thread_slot,
                                             SERVER_READY, NULL);
-        if (!is_listener) {
-            /* Wait until it's our turn to become the listener */
-            if ((rv = worker_stack_wait(idle_worker_stack, my_worker_num)) !=
-                APR_SUCCESS) {
-                if (rv != APR_EINVAL) {
-                    ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
-                                 "worker_stack_wait failed. Shutting down");
-                }
-                break;
-            }
-            if (workers_may_exit) {
-                break;
-            }
-            is_listener = 1;
-        }
-
+        rv = io_multiplexer_get_event(iom, &event);
         /* TODO: requests_this_child should be synchronized - aaron */
         if (requests_this_child <= 0) {
             check_infinite_requests();
         }
-        if (workers_may_exit) break;
-
-        if ((rv = SAFE_ACCEPT(apr_proc_mutex_lock(accept_mutex)))
-            != APR_SUCCESS) {
-            int level = APLOG_EMERG;
-
-            if (workers_may_exit) {
-                break;
-            }
-            if (ap_scoreboard_image->parent[process_slot].generation != 
-                ap_scoreboard_image->global->running_generation) {
-                level = APLOG_DEBUG; /* common to get these at restart time */
-            }
-            ap_log_error(APLOG_MARK, level, rv, ap_server_conf,
-                         "apr_proc_mutex_lock failed. Attempting to shutdown "
-                         "process gracefully.");
-            signal_threads(ST_GRACEFUL);
-            break;                    /* skip the lock release */
-        }
-
-        if (!ap_listeners->next) {
-            /* Only one listener, so skip the poll */
-            lr = ap_listeners;
+        if (workers_may_exit) {
+            break;
         }
-        else {
-            while (!workers_may_exit) {
-                apr_status_t ret;
-                const apr_pollfd_t *pdesc;
-
-                ret = apr_pollset_poll(pollset, -1, &numdesc, &pdesc);
-                if (ret != APR_SUCCESS) {
-                    if (APR_STATUS_IS_EINTR(ret)) {
-                        continue;
-                    }
-
-                    /* apr_pollset_poll() will only return errors in catastrophic
-                     * circumstances. Let's try exiting gracefully, for now. */
-                    ap_log_error(APLOG_MARK, APLOG_ERR, ret, (const server_rec *)
-                                 ap_server_conf, "apr_pollset_poll: (listen)");
-                    signal_threads(ST_GRACEFUL);
+        if (rv == APR_SUCCESS) {
+            multiplexable *m = (multiplexable *)event.client_data;
+            if (m->type == IOM_CONNECTION) {
+                rv = process_event_on_connection(&event, iom, process_slot, thread_slot);
+            }
+            else if (m->type == IOM_LISTENER) {
+                rv = accept_new_connection(&event, iom, process_slot, thread_slot);
+                if (rv != APR_SUCCESS) {
+                    ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf,
+                                 "accept_new_connection() failed");
                 }
-
-                if (workers_may_exit) break;
-
-                /* We can always use pdesc[0], but sockets at position N
-                 * could end up completely starved of attention in a very
-                 * busy server. Therefore, we round-robin across the
-                 * returned set of descriptors. While it is possible that
-                 * the returned set of descriptors might flip around and
-                 * continue to starve some sockets, we happen to know the
-                 * internal pollset implementation retains ordering
-                 * stability of the sockets. Thus, the round-robin should
-                 * ensure that a socket will eventually be serviced.
-                 */
-                if (last_poll_idx >= numdesc)
-                    last_poll_idx = 0;
-
-                /* Grab a listener record from the client_data of the poll
-                 * descriptor, and advance our saved index to round-robin
-                 * the next fetch.
-                 *
-                 * ### hmm... this descriptor might have POLLERR rather
-                 * ### than POLLIN
-                 */
-                lr = pdesc[last_poll_idx++].client_data;
-                goto got_fd;
             }
-        }
-    got_fd:
-        if (!workers_may_exit) {
-            rv = lr->accept_func(&csd, lr, ptrans);
-            /* later we trash rv and rely on csd to indicate success/failure */
-            AP_DEBUG_ASSERT(rv == APR_SUCCESS || !csd);
-
-            if (rv == APR_EGENERAL) {
-                /* E[NM]FILE, ENOMEM, etc */
-                resource_shortage = 1;
-                signal_threads(ST_GRACEFUL);
-            }
-            if ((rv = SAFE_ACCEPT(apr_proc_mutex_unlock(accept_mutex)))
-                != APR_SUCCESS) {
-                int level = APLOG_EMERG;
-
-                if (workers_may_exit) {
-                    break;
-                }
-                if (ap_scoreboard_image->parent[process_slot].generation != 
-                    ap_scoreboard_image->global->running_generation) {
-                    level = APLOG_DEBUG; /* common to get these at restart time */
-                }
-                ap_log_error(APLOG_MARK, level, rv, ap_server_conf,
-                             "apr_proc_mutex_unlock failed. Attempting to "
-                             "shutdown process gracefully.");
-                signal_threads(ST_GRACEFUL);
-            }
-            if (csd != NULL) {
-                is_listener = 0;
-                worker_stack_awaken_next(idle_worker_stack);
-                process_socket(ptrans, csd, process_slot,
-                               thread_slot, bucket_alloc);
-                apr_pool_clear(ptrans);
-                requests_this_child--;
-            }
-            if ((ap_mpm_pod_check(pod) == APR_SUCCESS) ||
-                (ap_my_generation !=
-                 ap_scoreboard_image->global->running_generation)) {
-                signal_threads(ST_GRACEFUL);
-                break;
+            else {
+                ap_log_error(APLOG_MARK, APLOG_CRIT, 0, ap_server_conf,
+                             "got event on polled object with unknown type %d",
+                             m->type);
             }
         }
         else {
-            if ((rv = SAFE_ACCEPT(apr_proc_mutex_unlock(accept_mutex)))
-                != APR_SUCCESS) {
-                ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
-                             "apr_proc_mutex_unlock failed. Attempting to "
-                             "shutdown process gracefully.");
-                signal_threads(ST_GRACEFUL);
-            }
-            break;
+            ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf, "io_multiplexer_get_event
failed");
         }
     }
 
     dying = 1;
     ap_scoreboard_image->parent[process_slot].quiescing = 1;
 
-    worker_stack_term(idle_worker_stack);
+    io_multiplexer_stop(iom, 0);
 
     ap_update_child_status_from_indexes(process_slot, thread_slot,
         (dying) ? SERVER_DEAD : SERVER_GRACEFUL, (request_rec *) NULL);
@@ -876,39 +704,43 @@
     int my_child_num = child_num_arg;
     proc_info *my_info;
     apr_status_t rv;
+    ap_listen_rec *listener;
     int i;
     int threads_created = 0;
     int loops;
     int prev_threads_created;
 
-    idle_worker_stack = worker_stack_create(pchild, ap_threads_per_child);
-    if (idle_worker_stack == NULL) {
-        ap_log_error(APLOG_MARK, APLOG_ALERT, 0, ap_server_conf,
-                     "worker_stack_create() failed");
+    rv = io_multiplexer_create(&iom, pchild, MAX_IOM_DESCRIPTORS);
+    if (rv != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf,
+                     "io_multiplexer_create() failed");
         clean_child_exit(APEXIT_CHILDFATAL);
     }
-
-    worker_wakeups = (worker_wakeup_info **)
-        apr_palloc(pchild, sizeof(worker_wakeup_info *) *
-                   ap_threads_per_child);
+    for (listener = ap_listeners; listener != NULL; listener = listener->next) {
+        apr_pollfd_t descriptor;
+        multiplexable *m = (multiplexable *)apr_palloc(pconf, sizeof(*m));
+        m->type = IOM_LISTENER;
+        m->l = listener;
+        descriptor.desc_type = APR_POLL_SOCKET;
+        descriptor.desc.s = listener->sd;
+        descriptor.reqevents = APR_POLLIN;
+        descriptor.client_data = NULL;
+        rv = io_multiplexer_add(iom, m, -1);
+        if (rv != APR_SUCCESS) {
+            ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf,
+                         "could not add listener to io_multiplexer");
+        }
+    }
 
     loops = prev_threads_created = 0;
     while (1) {
         for (i = 0; i < ap_threads_per_child; i++) {
             int status = ap_scoreboard_image->servers[child_num_arg][i].status;
-            worker_wakeup_info *wakeup;
 
             if (status != SERVER_GRACEFUL && status != SERVER_DEAD) {
                 continue;
             }
 
-            wakeup = worker_wakeup_create(pchild);
-            if (wakeup == NULL) {
-                ap_log_error(APLOG_MARK, APLOG_ALERT|APLOG_NOERRNO, 0,
-                             ap_server_conf, "worker_wakeup_create failed");
-                clean_child_exit(APEXIT_CHILDFATAL);
-            }
-            worker_wakeups[threads_created] = wakeup;
             my_info = (proc_info *)malloc(sizeof(proc_info));
             if (my_info == NULL) {
                 ap_log_error(APLOG_MARK, APLOG_ALERT, errno, ap_server_conf,
@@ -1022,14 +854,6 @@
     /*stuff to do before we switch id's, so we have permissions.*/
     ap_reopen_scoreboard(pchild, NULL, 0);
 
-    rv = SAFE_ACCEPT(apr_proc_mutex_child_init(&accept_mutex, ap_lock_fname,
-                                               pchild));
-    if (rv != APR_SUCCESS) {
-        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
-                     "Couldn't initialize cross-process lock in child");
-        clean_child_exit(APEXIT_CHILDFATAL);
-    }
-
     if (unixd_setup_child()) {
         clean_child_exit(APEXIT_CHILDFATAL);
     }
@@ -1446,7 +1270,6 @@
 int ap_mpm_run(apr_pool_t *_pconf, apr_pool_t *plog, server_rec *s)
 {
     int remaining_children_to_start;
-    apr_status_t rv;
 
     ap_log_pid(pconf, ap_pid_fname);
 
@@ -1458,36 +1281,6 @@
                      "ignored during restart");
         changed_limit_at_restart = 0;
     }
-    
-    /* Initialize cross-process accept lock */
-    ap_lock_fname = apr_psprintf(_pconf, "%s.%" APR_PID_T_FMT,
-                                 ap_server_root_relative(_pconf, ap_lock_fname),
-                                 ap_my_pid);
-
-    rv = apr_proc_mutex_create(&accept_mutex, ap_lock_fname, 
-                               ap_accept_lock_mech, _pconf);
-    if (rv != APR_SUCCESS) {
-        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, s,
-                     "Couldn't create accept lock");
-        mpm_state = AP_MPMQ_STOPPING;
-        return 1;
-    }
-
-#if APR_USE_SYSVSEM_SERIALIZE
-    if (ap_accept_lock_mech == APR_LOCK_DEFAULT || 
-        ap_accept_lock_mech == APR_LOCK_SYSVSEM) {
-#else
-    if (ap_accept_lock_mech == APR_LOCK_SYSVSEM) {
-#endif
-        rv = unixd_set_proc_mutex_perms(accept_mutex);
-        if (rv != APR_SUCCESS) {
-            ap_log_error(APLOG_MARK, APLOG_EMERG, rv, s,
-                         "Couldn't set permissions on cross-process lock; "
-                         "check User and Group directives");
-            mpm_state = AP_MPMQ_STOPPING;
-            return 1;
-        }
-    }
 
     if (!is_graceful) {
         if (ap_run_pre_mpm(s->process->pool, SB_SHARED) != OK) {
@@ -1532,12 +1325,6 @@
                 ap_get_server_version());
     ap_log_error(APLOG_MARK, APLOG_INFO, 0, ap_server_conf,
                 "Server built: %s", ap_get_server_built());
-#ifdef AP_MPM_WANT_SET_ACCEPT_LOCK_MECH
-    ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
-		"AcceptMutex: %s (default: %s)",
-		apr_proc_mutex_name(accept_mutex),
-		apr_proc_mutex_defname());
-#endif
     restart_pending = shutdown_pending = 0;
     mpm_state = AP_MPMQ_RUNNING;
 



Mime
View raw message