httpd-cvs mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bri...@apache.org
Subject svn commit: r327945 - in /httpd/httpd/trunk: CHANGES modules/http/http_core.c modules/http/http_request.c server/mpm/experimental/event/event.c
Date Mon, 24 Oct 2005 03:33:17 GMT
Author: brianp
Date: Sun Oct 23 20:33:14 2005
New Revision: 327945

URL: http://svn.apache.org/viewcvs?rev=327945&view=rev
Log:
Async write completion for Event MPM
(backported from async-dev branch to 2.3 trunk)

Modified:
    httpd/httpd/trunk/CHANGES
    httpd/httpd/trunk/modules/http/http_core.c
    httpd/httpd/trunk/modules/http/http_request.c
    httpd/httpd/trunk/server/mpm/experimental/event/event.c

Modified: httpd/httpd/trunk/CHANGES
URL: http://svn.apache.org/viewcvs/httpd/httpd/trunk/CHANGES?rev=327945&r1=327944&r2=327945&view=diff
==============================================================================
--- httpd/httpd/trunk/CHANGES [utf-8] (original)
+++ httpd/httpd/trunk/CHANGES [utf-8] Sun Oct 23 20:33:14 2005
@@ -2,6 +2,8 @@
 Changes with Apache 2.3.0
   [Remove entries to the current 2.0 and 2.2 section below, when backported]
 
+  *) Asynchronous write completion for the Event MPM.  [Brian Pane]
+
   *) Added an End-Of-Request bucket type.  The logging of a request and
      the freeing of its pool are now done when the EOR bucket is destroyed.
      This has the effect of delaying the logging until right after the last

Modified: httpd/httpd/trunk/modules/http/http_core.c
URL: http://svn.apache.org/viewcvs/httpd/httpd/trunk/modules/http/http_core.c?rev=327945&r1=327944&r2=327945&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http/http_core.c (original)
+++ httpd/httpd/trunk/modules/http/http_core.c Sun Oct 23 20:33:14 2005
@@ -122,26 +122,18 @@
             /* process the request if it was read without error */
                                                        
             ap_update_child_status(c->sbh, SERVER_BUSY_WRITE, r);
-            if (r->status == HTTP_OK)
-                ap_process_request(r);
+            if (r->status == HTTP_OK) {
+                cs->state = CONN_STATE_HANDLER;
+                ap_process_async_request(r);
+            }
 
             if (ap_extended_status)
                 ap_increment_counts(c->sbh, r);
 
-            if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted 
-                    || ap_graceful_stop_signalled()) {
+            if (cs->state != CONN_STATE_WRITE_COMPLETION) {
+                /* Something went wrong; close the connection */
                 cs->state = CONN_STATE_LINGER;
             }
-            else if (!c->data_in_input_filters) {
-                cs->state = CONN_STATE_CHECK_REQUEST_LINE_READABLE;
-            }
-            else {
-                /* else we are pipelining.  Stay in READ_REQUEST_LINE state
-                 *  and stay in the loop
-                 */
-                cs->state = CONN_STATE_READ_REQUEST_LINE;
-            }
-
         }
         else {   /* ap_read_request failed - client may have closed */
             cs->state = CONN_STATE_LINGER;

Modified: httpd/httpd/trunk/modules/http/http_request.c
URL: http://svn.apache.org/viewcvs/httpd/httpd/trunk/modules/http/http_request.c?rev=327945&r1=327944&r2=327945&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http/http_request.c (original)
+++ httpd/httpd/trunk/modules/http/http_request.c Sun Oct 23 20:33:14 2005
@@ -191,47 +191,23 @@
     ap_send_error_response(r_1st_err, recursive_error);
 }
 
-static void check_pipeline_flush(conn_rec *c)
+static void check_pipeline(conn_rec *c)
 {
-    apr_bucket *e;
-    apr_bucket_brigade *bb;
-
-    /* ### if would be nice if we could PEEK without a brigade. that would
-       ### allow us to defer creation of the brigade to when we actually
-       ### need to send a FLUSH. */
-    bb = apr_brigade_create(c->pool, c->bucket_alloc);
-
-    /* Flush the filter contents if:
-     *
-     *   1) the connection will be closed
-     *   2) there isn't a request ready to be read
-     */
     /* ### is zero correct? that means "read one line" */
     if (c->keepalive != AP_CONN_CLOSE) {
+        apr_bucket_brigade *bb = apr_brigade_create(c->pool, c->bucket_alloc);
         if (ap_get_brigade(c->input_filters, bb, AP_MODE_EATCRLF, 
                        APR_NONBLOCK_READ, 0) != APR_SUCCESS) {
             c->data_in_input_filters = 0;  /* we got APR_EOF or an error */
         }
         else {
             c->data_in_input_filters = 1;
-            return;    /* don't flush */
         }
     }
-
-        e = apr_bucket_flush_create(c->bucket_alloc);
-
-        /* We just send directly to the connection based filters.  At
-         * this point, we know that we have seen all of the data
-         * (request finalization sent an EOS bucket, which empties all
-         * of the request filters). We just want to flush the buckets
-         * if something hasn't been sent to the network yet.
-         */
-        APR_BRIGADE_INSERT_HEAD(bb, e);
-        ap_pass_brigade(c->output_filters, bb);
 }
 
 
-void ap_process_request(request_rec *r)
+void ap_process_async_request(request_rec *r)
 {
     int access_status;
     apr_bucket_brigade *bb;
@@ -289,9 +265,28 @@
      */
 
     c->cs->state = CONN_STATE_WRITE_COMPLETION;
-    check_pipeline_flush(c);
+    check_pipeline(c);
     if (ap_extended_status)
         ap_time_process_request(c->sbh, STOP_PREQUEST);
+}
+
+void ap_process_request(request_rec *r)
+{
+    apr_bucket_brigade *bb;
+    apr_bucket *b;
+    conn_rec *c = r->connection;
+
+    ap_process_async_request(r);
+
+    if (!c->data_in_input_filters) {
+        bb = apr_brigade_create(c->pool, c->bucket_alloc);
+        b = apr_bucket_flush_create(c->bucket_alloc);
+        APR_BRIGADE_INSERT_HEAD(bb, b);
+        ap_pass_brigade(c->output_filters, bb);
+    }
+    if (ap_extended_status) {
+        ap_time_process_request(c->sbh, STOP_PREQUEST);
+    }
 }
 
 static apr_table_t *rename_original_env(apr_pool_t *p, apr_table_t *t)

Modified: httpd/httpd/trunk/server/mpm/experimental/event/event.c
URL: http://svn.apache.org/viewcvs/httpd/httpd/trunk/server/mpm/experimental/event/event.c?rev=327945&r1=327944&r2=327945&view=diff
==============================================================================
--- httpd/httpd/trunk/server/mpm/experimental/event/event.c (original)
+++ httpd/httpd/trunk/server/mpm/experimental/event/event.c Sun Oct 23 20:33:14 2005
@@ -164,7 +164,7 @@
 
 apr_thread_mutex_t *timeout_mutex;
 APR_RING_HEAD(timeout_head_t, conn_state_t);
-static struct timeout_head_t timeout_head;
+static struct timeout_head_t timeout_head, keepalive_timeout_head;
 
 static apr_pollset_t *event_pollset;
 
@@ -592,6 +592,7 @@
         pt->status = 1;
         pt->baton = cs;
         cs->pfd.client_data = pt;
+        APR_RING_ELEM_INIT(cs, timeout_list);
 
         ap_update_vhost_given_ip(c);
 
@@ -621,8 +622,10 @@
     else {
         c = cs->c;
         c->sbh = sbh;
+        pt = cs->pfd.client_data;
     }
 
+read_request:
     if (cs->state == CONN_STATE_READ_REQUEST_LINE) {
         if (!c->aborted) {
             ap_run_process_connection(c);
@@ -636,6 +639,49 @@
             cs->state = CONN_STATE_LINGER;
         }
     }
+    
+    if (cs->state == CONN_STATE_WRITE_COMPLETION) {
+        /* For now, do blocking writes in this thread to transfer the
+         * rest of the response.  TODO: Hand off this connection to a
+         * pollset for asynchronous write completion.
+         */
+        ap_filter_t *output_filter = c->output_filters;
+        apr_status_t rv;
+        while (output_filter->next != NULL) {
+            output_filter = output_filter->next;
+        }
+        rv = output_filter->frec->filter_func.out_func(output_filter, NULL);
+        if (rv != APR_SUCCESS) {
+            ap_log_error(APLOG_MARK, APLOG_WARNING, rv, ap_server_conf,
+                     "network write failure in core output filter");
+            cs->state = CONN_STATE_LINGER;
+        }
+        else if (c->data_in_output_filters) {
+            /* Still in WRITE_COMPLETION_STATE:
+             * Set a write timeout for this connection, and let the
+             * event thread poll for writeability.
+             */
+            cs->expiration_time = ap_server_conf->timeout + time_now;
+            apr_thread_mutex_lock(timeout_mutex);
+            APR_RING_INSERT_TAIL(&timeout_head, cs, conn_state_t, timeout_list);
+            apr_thread_mutex_unlock(timeout_mutex);
+            pt->status = 0;
+            cs->pfd.reqevents = APR_POLLOUT | APR_POLLHUP | APR_POLLERR;
+            rc = apr_pollset_add(event_pollset, &cs->pfd);
+            return 1;
+        }
+        else if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted || 
+            ap_graceful_stop_signalled()) {
+            c->cs->state = CONN_STATE_LINGER;
+        }
+        else if (c->data_in_input_filters) {
+            cs->state = CONN_STATE_READ_REQUEST_LINE;
+            goto read_request;
+        }
+        else {
+            cs->state = CONN_STATE_CHECK_REQUEST_LINE_READABLE;
+        }
+    }
 
     if (cs->state == CONN_STATE_LINGER) {
         ap_lingering_close(c);
@@ -658,11 +704,12 @@
          */
         cs->expiration_time = ap_server_conf->keep_alive_timeout + time_now;
         apr_thread_mutex_lock(timeout_mutex);
-        APR_RING_INSERT_TAIL(&timeout_head, cs, conn_state_t, timeout_list);
+        APR_RING_INSERT_TAIL(&keepalive_timeout_head, cs, conn_state_t, timeout_list);
         apr_thread_mutex_unlock(timeout_mutex);
 
         pt->status = 0;
-        /* Add work to pollset. These are always read events */
+        /* Add work to pollset. */
+        cs->pfd.reqevents = APR_POLLIN;
         rc = apr_pollset_add(event_pollset, &cs->pfd);
 
         if (rc != APR_SUCCESS) {
@@ -839,11 +886,12 @@
     }
 
     APR_RING_INIT(&timeout_head, conn_state_t, timeout_list);
+    APR_RING_INIT(&keepalive_timeout_head, conn_state_t, timeout_list);
 
     /* Create the main pollset */
     rc = apr_pollset_create(&event_pollset,
                             ap_threads_per_child,
-                            tpool, APR_POLLSET_THREADSAFE);
+                            tpool, APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY);
     if (rc != APR_SUCCESS) {
         ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
                      "apr_pollset_create with Thread Safety failed. "
@@ -853,19 +901,19 @@
     }
 
     for (lr = ap_listeners; lr != NULL; lr = lr->next) {
-        apr_pollfd_t pfd = { 0 };
+        apr_pollfd_t *pfd = apr_palloc(tpool, sizeof(*pfd));
         pt = apr_pcalloc(tpool, sizeof(*pt));
-        pfd.desc_type = APR_POLL_SOCKET;
-        pfd.desc.s = lr->sd;
-        pfd.reqevents = APR_POLLIN;
+        pfd->desc_type = APR_POLL_SOCKET;
+        pfd->desc.s = lr->sd;
+        pfd->reqevents = APR_POLLIN;
 
         pt->type = PT_ACCEPT;
         pt->baton = lr;
 
-        pfd.client_data = pt;
+        pfd->client_data = pt;
 
-        apr_socket_opt_set(pfd.desc.s, APR_SO_NONBLOCK, 1);
-        apr_pollset_add(event_pollset, &pfd);
+        apr_socket_opt_set(pfd->desc.s, APR_SO_NONBLOCK, 1);
+        apr_pollset_add(event_pollset, pfd);
     }
 
     /* Unblock the signal used to wake this thread up, and set a handler for
@@ -907,6 +955,8 @@
                 case CONN_STATE_CHECK_REQUEST_LINE_READABLE:
                     cs->state = CONN_STATE_READ_REQUEST_LINE;
                     break;
+                case CONN_STATE_WRITE_COMPLETION:
+                    break;
                 default:
                     ap_log_error(APLOG_MARK, APLOG_ERR, rc,
                                  ap_server_conf,
@@ -918,6 +968,7 @@
                 apr_thread_mutex_lock(timeout_mutex);
                 APR_RING_REMOVE(cs, timeout_list);
                 apr_thread_mutex_unlock(timeout_mutex);
+                APR_RING_ELEM_INIT(cs, timeout_list);
 
                 rc = push2worker(out_pfd, event_pollset);
                 if (rc != APR_SUCCESS) {
@@ -1002,9 +1053,10 @@
         /* handle timed out sockets */
         apr_thread_mutex_lock(timeout_mutex);
 
-        cs = APR_RING_FIRST(&timeout_head);
+        /* Step 1: keepalive timeouts */
+        cs = APR_RING_FIRST(&keepalive_timeout_head);
         timeout_time = time_now + TIMEOUT_FUDGE_FACTOR;
-        while (!APR_RING_EMPTY(&timeout_head, conn_state_t, timeout_list)
+        while (!APR_RING_EMPTY(&keepalive_timeout_head, conn_state_t, timeout_list)
                && cs->expiration_time < timeout_time
                && get_worker(&have_idle_worker)) {
 
@@ -1023,8 +1075,25 @@
                  */
             }
             have_idle_worker = 0;
+            cs = APR_RING_FIRST(&keepalive_timeout_head);
+        }
+
+        /* Step 2: write completion timeouts */
+        cs = APR_RING_FIRST(&timeout_head);
+        while (!APR_RING_EMPTY(&timeout_head, conn_state_t, timeout_list)
+               && cs->expiration_time < timeout_time
+               && get_worker(&have_idle_worker)) {
+
+            cs->state = CONN_STATE_LINGER;
+            APR_RING_REMOVE(cs, timeout_list);
+            rc = push2worker(&cs->pfd, event_pollset);
+            if (rc != APR_SUCCESS) {
+                return NULL;
+            }
+            have_idle_worker = 0;
             cs = APR_RING_FIRST(&timeout_head);
         }
+
         apr_thread_mutex_unlock(timeout_mutex);
 
     }     /* listener main loop */
@@ -2132,7 +2201,7 @@
     if (restart_num++ == 1) {
         is_graceful = 0;
         rv = apr_pollset_create(&event_pollset, 1, plog,
-                                APR_POLLSET_THREADSAFE);
+                                APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY);
         if (rv != APR_SUCCESS) {
             ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
                          "Couldn't create a Thread Safe Pollset. "



Mime
View raw message