httpd-cvs mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yla...@apache.org
Subject svn commit: r1802875 - in /httpd/httpd/trunk: CHANGES server/mpm/event/event.c
Date Mon, 24 Jul 2017 23:19:06 GMT
Author: ylavic
Date: Mon Jul 24 23:19:06 2017
New Revision: 1802875

URL: http://svn.apache.org/viewvc?rev=1802875&view=rev
Log:
event: Avoid possible blocking in the listener thread when shutting down
connections. PR 60956.

start_lingering_close_nonblocking() now puts connections in defer_linger_chain
which is emptied by any worker thread (all atomically) after its usual work,
hence the possibly blocking flush and lingering close run outside the listener.

The listener may create a dedicated worker it fills defer_linger_chain or while
it's not empty, calling push2worker with a NULL cs.

The state machine in process_socket() slighly modified to be able to enter with
CONN_STATE_LINGER directly w/o clogging_input_filters to possibly interfer.

New abort_socket_nonblocking() allows to reset connections when nonblocking is
required and we can't do much about the connection anymore, nor want the system
to linger on its own after close().

Many thanks to Stefan Priebe for it heavy testing on many event's changes!


Modified:
    httpd/httpd/trunk/CHANGES
    httpd/httpd/trunk/server/mpm/event/event.c

Modified: httpd/httpd/trunk/CHANGES
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/CHANGES?rev=1802875&r1=1802874&r2=1802875&view=diff
==============================================================================
--- httpd/httpd/trunk/CHANGES [utf-8] (original)
+++ httpd/httpd/trunk/CHANGES [utf-8] Mon Jul 24 23:19:06 2017
@@ -1,6 +1,9 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache 2.5.0
 
+  *) event: Avoid possible blocking in the listener thread when shutting down
+     connections. PR 60956.  [Yann Ylavic]
+
   *) mod_proxy_fcgi: Add the support for mod_proxy's flushpackets and flushwait
      parameters. [Luca Toscano, Ruediger Pluem, Yann Ylavic]
 

Modified: httpd/httpd/trunk/server/mpm/event/event.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/server/mpm/event/event.c?rev=1802875&r1=1802874&r2=1802875&view=diff
==============================================================================
--- httpd/httpd/trunk/server/mpm/event/event.c (original)
+++ httpd/httpd/trunk/server/mpm/event/event.c Mon Jul 24 23:19:06 2017
@@ -219,6 +219,12 @@ static apr_pollfd_t *listener_pollfd;
  */
 static apr_pollset_t *event_pollset;
 
+/*
+ * The chain of connections to be shutdown by a worker thread (deferred),
+ * linked list updated atomically.
+ */
+static event_conn_state_t *volatile defer_linger_chain;
+
 struct event_conn_state_t {
     /** APR_RING of expiration timeouts */
     APR_RING_ENTRY(event_conn_state_t) timeout_list;
@@ -245,7 +251,10 @@ struct event_conn_state_t {
     apr_pollfd_t pfd;
     /** public parts of the connection state */
     conn_state_t pub;
+    /** chaining in defer_linger_chain */
+    struct event_conn_state_t *chain;
 };
+
 APR_RING_HEAD(timeout_head_t, event_conn_state_t);
 
 struct timeout_queue {
@@ -505,14 +514,55 @@ static void enable_listensocks(int proce
     ap_scoreboard_image->parent[process_slot].not_accepting = 0;
 }
 
+static void abort_socket_nonblocking(apr_socket_t *csd)
+{
+    apr_status_t rv;
+    apr_socket_timeout_set(csd, 0);
+#if defined(SOL_SOCKET) && defined(SO_LINGER)
+    /* This socket is over now, and we don't want to block nor linger
+     * anymore, so reset it. A normal close could still linger in the
+     * system, while RST is fast, nonblocking, and what the peer will
+     * get if it sends us further data anyway.
+     */
+    {
+        apr_os_sock_t osd = -1;
+        struct linger opt;
+        opt.l_onoff = 1;
+        opt.l_linger = 0; /* zero timeout is RST */
+        apr_os_sock_get(&osd, csd);
+        setsockopt(osd, SOL_SOCKET, SO_LINGER, (void *)&opt, sizeof opt);
+    }
+#endif
+    rv = apr_socket_close(csd);
+    if (rv != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(00468)
+                     "error closing socket");
+        AP_DEBUG_ASSERT(0);
+    }
+}
+
 static void close_worker_sockets(void)
 {
     int i;
     for (i = 0; i < threads_per_child; i++) {
-        if (worker_sockets[i]) {
-            apr_socket_close(worker_sockets[i]);
+        apr_socket_t *csd = worker_sockets[i];
+        if (csd) {
             worker_sockets[i] = NULL;
+            abort_socket_nonblocking(csd);
+        }
+    }
+    for (;;) {
+        event_conn_state_t *cs = defer_linger_chain;
+        if (!cs) {
+            break;
+        }
+        if (apr_atomic_casptr((void *)&defer_linger_chain, cs->chain,
+                              cs) != cs) {
+            /* Race lost, try again */
+            continue;
         }
+        cs->chain = NULL;
+        abort_socket_nonblocking(cs->pfd.desc.s);
     }
 }
 
@@ -733,11 +783,27 @@ static void notify_resume(event_conn_sta
     ap_run_resume_connection(cs->c, cs->r);
 }
 
-static int start_lingering_close_common(event_conn_state_t *cs, int in_worker)
+/*
+ * Close our side of the connection, flushing data to the client first.
+ * Pre-condition: cs is not in any timeout queue and not in the pollset,
+ *                timeout_mutex is not locked
+ * return: 0 if connection is fully closed,
+ *         1 if connection is lingering
+ * May only be called by worker thread.
+ */
+static int start_lingering_close_blocking(event_conn_state_t *cs)
 {
     apr_status_t rv;
     struct timeout_queue *q;
     apr_socket_t *csd = cs->pfd.desc.s;
+
+    if (ap_start_lingering_close(cs->c)) {
+        notify_suspend(cs);
+        apr_socket_close(csd);
+        ap_push_pool(worker_queue_info, cs->p);
+        return 0;
+    }
+
 #ifdef AP_DEBUG
     {
         rv = apr_socket_timeout_set(csd, 0);
@@ -746,6 +812,7 @@ static int start_lingering_close_common(
 #else
     apr_socket_timeout_set(csd, 0);
 #endif
+
     cs->queue_timestamp = apr_time_now();
     /*
      * If some module requested a shortened waiting period, only wait for
@@ -761,12 +828,8 @@ static int start_lingering_close_common(
         cs->pub.state = CONN_STATE_LINGER_NORMAL;
     }
     apr_atomic_inc32(&lingering_count);
-    if (in_worker) { 
-        notify_suspend(cs);
-    }
-    else {
-        cs->c->sbh = NULL;
-    }
+    notify_suspend(cs);
+
     cs->pfd.reqevents = (
             cs->pub.sense == CONN_SENSE_WANT_WRITE ? APR_POLLOUT :
                     APR_POLLIN) | APR_POLLHUP | APR_POLLERR;
@@ -789,49 +852,25 @@ static int start_lingering_close_common(
 }
 
 /*
- * Close our side of the connection, flushing data to the client first.
- * Pre-condition: cs is not in any timeout queue and not in the pollset,
- *                timeout_mutex is not locked
- * return: 0 if connection is fully closed,
- *         1 if connection is lingering
- * May only be called by worker thread.
- */
-static int start_lingering_close_blocking(event_conn_state_t *cs)
-{
-    if (ap_start_lingering_close(cs->c)) {
-        notify_suspend(cs);
-        ap_push_pool(worker_queue_info, cs->p);
-        return 0;
-    }
-    return start_lingering_close_common(cs, 1);
-}
-
-/*
- * Close our side of the connection, NOT flushing data to the client.
- * This should only be called if there has been an error or if we know
- * that our send buffers are empty.
+ * Defer flush and close of the connection by adding it to defer_linger_chain,
+ * for a worker to grab it and do the job (should that be blocking).
  * Pre-condition: cs is not in any timeout queue and not in the pollset,
  *                timeout_mutex is not locked
- * return: 0 if connection is fully closed,
- *         1 if connection is lingering
- * may be called by listener thread
+ * return: 1 connection is alive (but aside and about to linger)
+ * May be called by listener thread.
  */
 static int start_lingering_close_nonblocking(event_conn_state_t *cs)
 {
-    conn_rec *c = cs->c;
-    apr_socket_t *csd = cs->pfd.desc.s;
-
-    if (ap_prep_lingering_close(c)
-        || c->aborted
-        || ap_shutdown_conn(c, 0) != APR_SUCCESS || c->aborted
-        || apr_socket_shutdown(csd, APR_SHUTDOWN_WRITE) != APR_SUCCESS) {
-        apr_socket_close(csd);
-        ap_push_pool(worker_queue_info, cs->p);
-        if (dying)
-            ap_queue_interrupt_one(worker_queue);
-        return 0;
+    event_conn_state_t *chain;
+    for (;;) {
+        cs->chain = chain = defer_linger_chain;
+        if (apr_atomic_casptr((void *)&defer_linger_chain, cs,
+                              chain) != chain) {
+            /* Race lost, try again */
+            continue;
+        }
+        return 1;
     }
-    return start_lingering_close_common(cs, 0);
 }
 
 /*
@@ -842,15 +881,10 @@ static int start_lingering_close_nonbloc
  */
 static int stop_lingering_close(event_conn_state_t *cs)
 {
-    apr_status_t rv;
     apr_socket_t *csd = ap_get_conn_socket(cs->c);
     ap_log_error(APLOG_MARK, APLOG_TRACE4, 0, ap_server_conf,
                  "socket reached timeout in lingering-close state");
-    rv = apr_socket_close(csd);
-    if (rv != APR_SUCCESS) {
-        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(00468) "error closing
socket");
-        AP_DEBUG_ASSERT(0);
-    }
+    abort_socket_nonblocking(csd);
     ap_push_pool(worker_queue_info, cs->p);
     if (dying)
         ap_queue_interrupt_one(worker_queue);
@@ -997,9 +1031,16 @@ static void process_socket(apr_thread_t
         c->current_thread = thd;
         /* Subsequent request on a conn, and thread number is part of ID */
         c->id = conn_id;
+
+        if (c->aborted) {
+            cs->pub.state = CONN_STATE_LINGER;
+        }
     }
 
-    if (c->clogging_input_filters && !c->aborted) {
+    if (cs->pub.state == CONN_STATE_LINGER) {
+        /* do lingering close below */
+    }
+    else if (c->clogging_input_filters) {
         /* Since we have an input filter which 'clogs' the input stream,
          * like mod_ssl used to, lets just do the normal read from input
          * filters, like the Worker MPM does. Filters that need to write
@@ -1013,20 +1054,14 @@ static void process_socket(apr_thread_t
         }
         apr_atomic_dec32(&clogged_count);
     }
-
+    else if (cs->pub.state == CONN_STATE_READ_REQUEST_LINE) {
 read_request:
-    if (cs->pub.state == CONN_STATE_READ_REQUEST_LINE) {
-        if (!c->aborted) {
-            ap_run_process_connection(c);
-
-            /* state will be updated upon return
-             * fall thru to either wait for readability/timeout or
-             * do lingering close
-             */
-        }
-        else {
-            cs->pub.state = CONN_STATE_LINGER;
-        }
+        ap_run_process_connection(c);
+
+        /* state will be updated upon return
+         * fall thru to either wait for readability/timeout or
+         * do lingering close
+         */
     }
 
     if (cs->pub.state == CONN_STATE_WRITE_COMPLETION) {
@@ -1067,7 +1102,7 @@ read_request:
             return;
         }
         else if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted ||
-            listener_may_exit) {
+                 listener_may_exit) {
             cs->pub.state = CONN_STATE_LINGER;
         }
         else if (c->data_in_input_filters || ap_run_input_pending(c) == OK) {
@@ -1287,26 +1322,32 @@ static apr_status_t push_timer2worker(ti
 }
 
 /*
- * Pre-condition: pfd->cs is neither in pollset nor timeout queue
+ * Pre-condition: cs is neither in event_pollset nor a timeout queue
  * this function may only be called by the listener
  */
-static apr_status_t push2worker(const apr_pollfd_t * pfd,
-                                apr_pollset_t * pollset)
+static apr_status_t push2worker(event_conn_state_t *cs, apr_socket_t *csd,
+                                apr_pool_t *ptrans)
 {
-    listener_poll_type *pt = (listener_poll_type *) pfd->client_data;
-    event_conn_state_t *cs = (event_conn_state_t *) pt->baton;
     apr_status_t rc;
 
-    rc = ap_queue_push(worker_queue, cs->pfd.desc.s, cs, cs->p);
+    if (cs) {
+        csd = cs->pfd.desc.s;
+        ptrans = cs->p;
+    }
+    rc = ap_queue_push(worker_queue, csd, cs, ptrans);
     if (rc != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf, APLOGNO(00471)
+                     "push2worker: ap_queue_push failed");
         /* trash the connection; we couldn't queue the connected
          * socket to a worker
          */
-        apr_bucket_alloc_destroy(cs->bucket_alloc);
-        apr_socket_close(cs->pfd.desc.s);
-        ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
-                     ap_server_conf, APLOGNO(00471) "push2worker: ap_queue_push failed");
-        ap_push_pool(worker_queue_info, cs->p);
+        if (csd) {
+            abort_socket_nonblocking(csd);
+        }
+        if (ptrans) {
+            ap_push_pool(worker_queue_info, ptrans);
+        }
+        signal_threads(ST_GRACEFUL);
     }
 
     return rc;
@@ -1861,6 +1902,8 @@ static void * APR_THREAD_FUNC listener_t
                     TO_QUEUE_REMOVE(remove_from_q, cs);
                     rc = apr_pollset_remove(event_pollset, &cs->pfd);
                     apr_thread_mutex_unlock(timeout_mutex);
+                    TO_QUEUE_ELEM_INIT(cs);
+
                     /*
                      * Some of the pollset backends, like KQueue or Epoll
                      * automagically remove the FD if the socket is closed,
@@ -1874,29 +1917,23 @@ static void * APR_THREAD_FUNC listener_t
                         break;
                     }
 
-                    TO_QUEUE_ELEM_INIT(cs);
                     /* If we didn't get a worker immediately for a keep-alive
                      * request, we close the connection, so that the client can
                      * re-connect to a different process.
                      */
                     if (!have_idle_worker) {
                         start_lingering_close_nonblocking(cs);
-                        break;
                     }
-                    rc = push2worker(out_pfd, event_pollset);
-                    if (rc != APR_SUCCESS) {
-                        ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
-                                     ap_server_conf, APLOGNO(03095)
-                                     "push2worker failed");
-                    }
-                    else {
+                    else if (push2worker(cs, NULL, NULL) == APR_SUCCESS) {
                         have_idle_worker = 0;
                     }
                     break;
+
                 case CONN_STATE_LINGER_NORMAL:
                 case CONN_STATE_LINGER_SHORT:
                     process_lingering_close(cs, out_pfd);
                     break;
+
                 default:
                     ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
                                  ap_server_conf, APLOGNO(03096)
@@ -1980,18 +2017,7 @@ static void * APR_THREAD_FUNC listener_t
 
                     if (csd != NULL) {
                         conns_this_child--;
-                        rc = ap_queue_push(worker_queue, csd, NULL, ptrans);
-                        if (rc != APR_SUCCESS) {
-                            /* trash the connection; we couldn't queue the connected
-                             * socket to a worker
-                             */
-                            apr_socket_close(csd);
-                            ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
-                                         ap_server_conf, APLOGNO(03098)
-                                         "ap_queue_push failed");
-                            ap_push_pool(worker_queue_info, ptrans);
-                        }
-                        else {
+                        if (push2worker(NULL, csd, ptrans) == APR_SUCCESS) {
                             have_idle_worker = 0;
                         }
                     }
@@ -2090,6 +2116,24 @@ static void * APR_THREAD_FUNC listener_t
             ps->keep_alive = 0;
         }
 
+        /* If there are some lingering closes to defer (to a worker), schedule
+         * them now. We might wakeup a worker spuriously if another one empties
+         * defer_linger_chain in the meantime, but there also may be no active
+         * or all busy workers for an undefined time.  In any case a deferred
+         * lingering close can't starve if we do that here since the chain is
+         * filled only above in the listener and it's emptied only in the
+         * worker(s); thus a NULL here means it will stay so while the listener
+         * waits (possibly indefinitely) in poll().
+         */
+        if (defer_linger_chain) {
+            get_worker(&have_idle_worker, 0, &workers_were_busy);
+            if (have_idle_worker
+                    && defer_linger_chain /* re-test */
+                    && push2worker(NULL, NULL, NULL) == APR_SUCCESS) {
+                have_idle_worker = 0;
+            }
+        }
+
         if (listeners_disabled && !workers_were_busy
             && ((c_count = apr_atomic_read32(&connection_count))
                     >= (l_count = apr_atomic_read32(&lingering_count))
@@ -2235,8 +2279,35 @@ static void *APR_THREAD_FUNC worker_thre
         }
         else {
             is_idle = 0;
-            worker_sockets[thread_slot] = csd;
-            process_socket(thd, ptrans, csd, cs, process_slot, thread_slot);
+            if (csd != NULL) {
+                worker_sockets[thread_slot] = csd;
+                process_socket(thd, ptrans, csd, cs, process_slot, thread_slot);
+                worker_sockets[thread_slot] = NULL;
+            }
+        }
+
+        /* If there are deferred lingering closes, handle them now. */
+        while (!workers_may_exit) {
+            cs = defer_linger_chain;
+            if (!cs) {
+                break;
+            }
+            if (apr_atomic_casptr((void *)&defer_linger_chain, cs->chain,
+                                  cs) != cs) {
+                /* Race lost, try again */
+                continue;
+            }
+            cs->chain = NULL;
+
+            worker_sockets[thread_slot] = csd = cs->pfd.desc.s;
+#ifdef AP_DEBUG
+            rv = apr_socket_timeout_set(csd, SECONDS_TO_LINGER);
+            AP_DEBUG_ASSERT(rv == APR_SUCCESS);
+#else
+            apr_socket_timeout_set(csd, SECONDS_TO_LINGER);
+#endif
+            cs->pub.state = CONN_STATE_LINGER;
+            process_socket(thd, cs->p, csd, cs, process_slot, thread_slot);
             worker_sockets[thread_slot] = NULL;
         }
     }
@@ -3497,6 +3568,7 @@ static int event_pre_config(apr_pool_t *
     active_daemons_limit = server_limit;
     threads_per_child = DEFAULT_THREADS_PER_CHILD;
     max_workers = active_daemons_limit * threads_per_child;
+    defer_linger_chain = NULL;
     had_healthy_child = 0;
     ap_extended_status = 0;
 



Mime
View raw message