Author: sf
Date: Sun Jun 19 12:23:42 2011
New Revision: 1137358
URL: http://svn.apache.org/viewvc?rev=1137358&view=rev
Log:
Some improvements for handling of many connections for MPM event:
- Process lingering close asynchronously instead of tying up worker threads
(based on patch by Jeff Trawick).
- If the number of connections of a process is above
threads_per_child + WORKER_OVERCOMMIT * (idle_workers - 1)
(WORKER_OVERCOMMIT is fixed at 2, at the moment), or if all workers are busy,
don't accept new connections in that process. Such a dynamic connection limit
is necessary because we may have both async and non-async (ssl) connections.
WORKER_OVERCOMMIT should be a config option.
- Don't count idle workers of not-accepting processes against MinSpareThreads,
so that the parent will spawn new processes when necessary.
- If we receive a keep-alive request while all workers are busy, don't block
but close the connection immediately so that the client will re-connect to a
different process.
Related changes:
- Log what is going on at trace loglevels.
- Remove the bypass_push poll type flag, this code cannot be hit anymore
(if it ever could?).
- Add some macro helpers for dealing with timeout queues.
Modified:
httpd/httpd/trunk/CHANGES
httpd/httpd/trunk/include/ap_mmn.h
httpd/httpd/trunk/include/http_connection.h
httpd/httpd/trunk/include/httpd.h
httpd/httpd/trunk/include/scoreboard.h
httpd/httpd/trunk/server/connection.c
httpd/httpd/trunk/server/mpm/event/event.c
httpd/httpd/trunk/server/mpm/event/fdqueue.c
httpd/httpd/trunk/server/mpm/event/fdqueue.h
Modified: httpd/httpd/trunk/CHANGES
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/CHANGES?rev=1137358&r1=1137357&r2=1137358&view=diff
==============================================================================
--- httpd/httpd/trunk/CHANGES [utf-8] (original)
+++ httpd/httpd/trunk/CHANGES [utf-8] Sun Jun 19 12:23:42 2011
@@ -2,6 +2,13 @@
Changes with Apache 2.3.13
+ *) mpm_event: If the number of connections of a process is very high, or if
+ all workers are busy, don't accept new connections in that process.
+ [Stefan Fritsch]
+
+ *) mpm_event: Process lingering close asynchronously instead of tying up
+ worker threads. [Jeff Trawick, Stefan Fritsch]
+
*) mpm_event: If MaxMemFree is set, limit the number of pools that is kept
around. [Stefan Fritsch]
Modified: httpd/httpd/trunk/include/ap_mmn.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/include/ap_mmn.h?rev=1137358&r1=1137357&r2=1137358&view=diff
==============================================================================
--- httpd/httpd/trunk/include/ap_mmn.h (original)
+++ httpd/httpd/trunk/include/ap_mmn.h Sun Jun 19 12:23:42 2011
@@ -333,14 +333,17 @@
* Add ap_context_*(), ap_set_context_info(), ap_set_document_root()
* 20110605.1 (2.3.13-dev) add ap_(get|set)_core_module_config()
* 20110605.2 (2.3.13-dev) add ap_get_conn_socket()
+ * 20110619.0 (2.3.13-dev) add async connection infos to process_score in scoreboard,
+ * add ap_start_lingering_close(),
+ * add conn_state_e:CONN_STATE_LINGER_NORMAL and CONN_STATE_LINGER_SHORT
*/
#define MODULE_MAGIC_COOKIE 0x41503234UL /* "AP24" */
#ifndef MODULE_MAGIC_NUMBER_MAJOR
-#define MODULE_MAGIC_NUMBER_MAJOR 20110605
+#define MODULE_MAGIC_NUMBER_MAJOR 20110619
#endif
-#define MODULE_MAGIC_NUMBER_MINOR 2 /* 0...n */
+#define MODULE_MAGIC_NUMBER_MINOR 0 /* 0...n */
/**
* Determine if the server's current MODULE_MAGIC_NUMBER is at least a
Modified: httpd/httpd/trunk/include/http_connection.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/include/http_connection.h?rev=1137358&r1=1137357&r2=1137358&view=diff
==============================================================================
--- httpd/httpd/trunk/include/http_connection.h (original)
+++ httpd/httpd/trunk/include/http_connection.h Sun Jun 19 12:23:42 2011
@@ -70,7 +70,9 @@ AP_CORE_DECLARE(void) ap_flush_conn(conn
*/
AP_DECLARE(void) ap_lingering_close(conn_rec *c);
- /* Hooks */
+AP_DECLARE(int) ap_start_lingering_close(conn_rec *c);
+
+/* Hooks */
/**
* create_connection is a RUN_FIRST hook which allows modules to create
* connections. In general, you should not install filters with the
Modified: httpd/httpd/trunk/include/httpd.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/include/httpd.h?rev=1137358&r1=1137357&r2=1137358&view=diff
==============================================================================
--- httpd/httpd/trunk/include/httpd.h (original)
+++ httpd/httpd/trunk/include/httpd.h Sun Jun 19 12:23:42 2011
@@ -1133,7 +1133,9 @@ typedef enum {
CONN_STATE_HANDLER,
CONN_STATE_WRITE_COMPLETION,
CONN_STATE_SUSPENDED,
- CONN_STATE_LINGER
+ CONN_STATE_LINGER,
+ CONN_STATE_LINGER_NORMAL,
+ CONN_STATE_LINGER_SHORT
} conn_state_e;
/**
Modified: httpd/httpd/trunk/include/scoreboard.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/include/scoreboard.h?rev=1137358&r1=1137357&r2=1137358&view=diff
==============================================================================
--- httpd/httpd/trunk/include/scoreboard.h (original)
+++ httpd/httpd/trunk/include/scoreboard.h Sun Jun 19 12:23:42 2011
@@ -132,9 +132,17 @@ typedef struct process_score process_sco
struct process_score {
pid_t pid;
ap_generation_t generation; /* generation of this child */
- int quiescing; /* the process whose pid is stored above is
+ char quiescing; /* the process whose pid is stored above is
* going down gracefully
*/
+ char not_accepting; /* the process is busy and is not accepting more
+ * connections (for async MPMs)
+ */
+ apr_uint32_t connections; /* total connections (for async MPMs) */
+ apr_uint32_t write_completion; /* async connections doing write completion */
+ apr_uint32_t lingering_close; /* async connections in lingering close */
+ apr_uint32_t keep_alive; /* async connections in keep alive */
+ apr_uint32_t suspended; /* connections suspended by some module */
};
/* Scoreboard is now in 'local' memory, since it isn't updated once created,
Modified: httpd/httpd/trunk/server/connection.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/server/connection.c?rev=1137358&r1=1137357&r2=1137358&view=diff
==============================================================================
--- httpd/httpd/trunk/server/connection.c (original)
+++ httpd/httpd/trunk/server/connection.c Sun Jun 19 12:23:42 2011
@@ -93,15 +93,13 @@ AP_CORE_DECLARE(void) ap_flush_conn(conn
* all the response data has been sent to the client.
*/
#define SECONDS_TO_LINGER 2
-AP_DECLARE(void) ap_lingering_close(conn_rec *c)
+
+AP_DECLARE(int) ap_start_lingering_close(conn_rec *c)
{
- char dummybuf[512];
- apr_size_t nbytes;
- apr_time_t timeup = 0;
apr_socket_t *csd = ap_get_conn_socket(c);
if (!csd) {
- return;
+ return 1;
}
ap_update_child_status(c->sbh, SERVER_CLOSING, NULL);
@@ -109,7 +107,7 @@ AP_DECLARE(void) ap_lingering_close(conn
#ifdef NO_LINGCLOSE
ap_flush_conn(c); /* just close it */
apr_socket_close(csd);
- return;
+ return 1;
#endif
/* Close the connection, being careful to send out whatever is still
@@ -122,7 +120,7 @@ AP_DECLARE(void) ap_lingering_close(conn
if (c->aborted) {
apr_socket_close(csd);
- return;
+ return 1;
}
/* Shut down the socket for write, which will send a FIN
@@ -131,6 +129,20 @@ AP_DECLARE(void) ap_lingering_close(conn
if (apr_socket_shutdown(csd, APR_SHUTDOWN_WRITE) != APR_SUCCESS
|| c->aborted) {
apr_socket_close(csd);
+ return 1;
+ }
+
+ return 0;
+}
+
+AP_DECLARE(void) ap_lingering_close(conn_rec *c)
+{
+ char dummybuf[512];
+ apr_size_t nbytes;
+ apr_time_t timeup = 0;
+ apr_socket_t *csd = ap_get_conn_socket(c);
+
+ if (ap_start_lingering_close(c)) {
return;
}
Modified: httpd/httpd/trunk/server/mpm/event/event.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/server/mpm/event/event.c?rev=1137358&r1=1137357&r2=1137358&view=diff
==============================================================================
--- httpd/httpd/trunk/server/mpm/event/event.c (original)
+++ httpd/httpd/trunk/server/mpm/event/event.c Sun Jun 19 12:23:42 2011
@@ -148,6 +148,11 @@
#define apr_time_from_msec(x) (x * 1000)
#endif
+#ifndef MAX_SECS_TO_LINGER
+#define MAX_SECS_TO_LINGER 30
+#endif
+#define SECONDS_TO_LINGER 2
+
/*
* Actual definitions of config globals
*/
@@ -174,7 +179,38 @@ static int mpm_state = AP_MPMQ_STARTING;
static apr_thread_mutex_t *timeout_mutex;
APR_RING_HEAD(timeout_head_t, conn_state_t);
-static struct timeout_head_t timeout_head, keepalive_timeout_head;
+struct timeout_queue {
+ struct timeout_head_t head;
+ int count;
+ const char *tag;
+};
+static struct timeout_queue write_completion_q, keepalive_q, linger_q,
+ short_linger_q;
+static apr_pollfd_t *listener_pollfd;
+
+/*
+ * Macros for accessing struct timeout_queue.
+ * For TO_QUEUE_APPEND and TO_QUEUE_REMOVE, timeout_mutex must be held.
+ */
+#define TO_QUEUE_APPEND(q, el) \
+ do { \
+ APR_RING_INSERT_TAIL(&(q).head, el, conn_state_t, timeout_list); \
+ (q).count++; \
+ } while (0)
+
+#define TO_QUEUE_REMOVE(q, el) \
+ do { \
+ APR_RING_REMOVE(el, timeout_list); \
+ (q).count--; \
+ } while (0)
+
+#define TO_QUEUE_INIT(q) \
+ do { \
+ APR_RING_INIT(&(q).head, conn_state_t, timeout_list); \
+ (q).tag = #q; \
+ } while (0)
+
+#define TO_QUEUE_ELEM_INIT(el) APR_RING_ELEM_INIT(el, timeout_list)
static apr_pollset_t *event_pollset;
@@ -218,7 +254,6 @@ typedef enum
typedef struct
{
poll_type_e type;
- int bypass_push;
void *baton;
} listener_poll_type;
@@ -299,6 +334,32 @@ static apr_os_thread_t *listener_os_thre
*/
static apr_socket_t **worker_sockets;
+static void disable_listensocks(int process_slot)
+{
+ int i;
+ for (i = 0; i < num_listensocks; i++) {
+ apr_pollset_remove(event_pollset, &listener_pollfd[i]);
+ }
+ ap_scoreboard_image->parent[process_slot].not_accepting = 1;
+}
+
+static void enable_listensocks(int process_slot)
+{
+ int i;
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
+ "Accepting new connections again: "
+ "%u active conns, %u idle workers",
+ apr_atomic_read32(&connection_count),
+ ap_queue_info_get_idlers(worker_queue_info));
+ for (i = 0; i < num_listensocks; i++)
+ apr_pollset_add(event_pollset, &listener_pollfd[i]);
+ /*
+ * XXX: This is not yet optimal. If many workers suddenly become available,
+ * XXX: the parent may kill some processes off too soon.
+ */
+ ap_scoreboard_image->parent[process_slot].not_accepting = 0;
+}
+
static void close_worker_sockets(void)
{
int i;
@@ -654,6 +715,69 @@ static void set_signals(void)
#endif
}
+static int start_lingering_close(conn_state_t *cs)
+{
+ apr_status_t rv;
+ if (ap_start_lingering_close(cs->c)) {
+ apr_pool_clear(cs->p);
+ ap_push_pool(worker_queue_info, cs->p);
+ return 0;
+ }
+ else {
+ apr_socket_t *csd = ap_get_conn_socket(cs->c);
+ struct timeout_queue *q;
+
+ rv = apr_socket_timeout_set(csd, 0);
+ AP_DEBUG_ASSERT(rv == APR_SUCCESS);
+ /*
+ * If some module requested a shortened waiting period, only wait for
+ * 2s (SECONDS_TO_LINGER). This is useful for mitigating certain
+ * DoS attacks.
+ */
+ if (apr_table_get(cs->c->notes, "short-lingering-close")) {
+ cs->expiration_time =
+ apr_time_now() + apr_time_from_sec(SECONDS_TO_LINGER);
+ q = &short_linger_q;
+ cs->state = CONN_STATE_LINGER_SHORT;
+ }
+ else {
+ cs->expiration_time =
+ apr_time_now() + apr_time_from_sec(MAX_SECS_TO_LINGER);
+ q = &linger_q;
+ cs->state = CONN_STATE_LINGER_NORMAL;
+ }
+ apr_thread_mutex_lock(timeout_mutex);
+ TO_QUEUE_APPEND(*q, cs);
+ apr_thread_mutex_unlock(timeout_mutex);
+ cs->pfd.reqevents = APR_POLLIN | APR_POLLHUP | APR_POLLERR;
+ rv = apr_pollset_add(event_pollset, &cs->pfd);
+ if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
+ ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf,
+ "start_lingering_close: apr_pollset_add failure");
+ AP_DEBUG_ASSERT(0);
+ }
+ }
+ return 1;
+}
+
+static int stop_lingering_close(conn_state_t *cs)
+{
+ apr_status_t rv;
+ apr_socket_t *csd = ap_get_conn_socket(cs->c);
+ ap_log_error(APLOG_MARK, APLOG_TRACE4, 0, ap_server_conf,
+ "socket reached timeout in lingering-close state");
+ rv = apr_socket_close(csd);
+ if (rv != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, "error closing socket");
+ AP_DEBUG_ASSERT(0);
+ }
+ apr_pool_clear(cs->p);
+ ap_push_pool(worker_queue_info, cs->p);
+ return 0;
+}
+
+
+
/*****************************************************************
* Child process main loop.
*/
@@ -689,10 +813,9 @@ static int process_socket(apr_thread_t *
cs->pfd.reqevents = APR_POLLIN;
cs->pfd.desc.s = sock;
pt->type = PT_CSD;
- pt->bypass_push = 1;
pt->baton = cs;
cs->pfd.client_data = pt;
- APR_RING_ELEM_INIT(cs, timeout_list);
+ TO_QUEUE_ELEM_INIT(cs);
ap_update_vhost_given_ip(c);
@@ -755,6 +878,7 @@ read_request:
if (cs->state == CONN_STATE_WRITE_COMPLETION) {
ap_filter_t *output_filter = c->output_filters;
apr_status_t rv;
+ ap_update_child_status_from_conn(sbh, SERVER_BUSY_WRITE, c);
while (output_filter->next != NULL) {
output_filter = output_filter->next;
}
@@ -771,9 +895,8 @@ read_request:
*/
cs->expiration_time = ap_server_conf->timeout + apr_time_now();
apr_thread_mutex_lock(timeout_mutex);
- APR_RING_INSERT_TAIL(&timeout_head, cs, conn_state_t, timeout_list);
+ TO_QUEUE_APPEND(write_completion_q, cs);
apr_thread_mutex_unlock(timeout_mutex);
- pt->bypass_push = 0;
cs->pfd.reqevents = APR_POLLOUT | APR_POLLHUP | APR_POLLERR;
rc = apr_pollset_add(event_pollset, &cs->pfd);
return 1;
@@ -792,14 +915,11 @@ read_request:
}
if (cs->state == CONN_STATE_LINGER) {
- ap_lingering_close(c);
- apr_pool_clear(p);
- ap_push_pool(worker_queue_info, p);
- return 0;
+ if (!start_lingering_close(cs))
+ return 0;
}
else if (cs->state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) {
apr_status_t rc;
- listener_poll_type *pt = (listener_poll_type *) cs->pfd.client_data;
/* It greatly simplifies the logic to use a single timeout value here
* because the new element can just be added to the end of the list and
@@ -812,10 +932,9 @@ read_request:
cs->expiration_time = ap_server_conf->keep_alive_timeout +
apr_time_now();
apr_thread_mutex_lock(timeout_mutex);
- APR_RING_INSERT_TAIL(&keepalive_timeout_head, cs, conn_state_t, timeout_list);
+ TO_QUEUE_APPEND(keepalive_q, cs);
apr_thread_mutex_unlock(timeout_mutex);
- pt->bypass_push = 0;
/* Add work to pollset. */
cs->pfd.reqevents = APR_POLLIN;
rc = apr_pollset_add(event_pollset, &cs->pfd);
@@ -844,21 +963,15 @@ static void check_infinite_requests(void
static void close_listeners(int process_slot, int *closed) {
if (!*closed) {
- ap_listen_rec *lr;
int i;
- for (lr = ap_listeners; lr != NULL; lr = lr->next) {
- apr_pollfd_t *pfd = apr_pcalloc(pchild, sizeof(*pfd));
- pfd->desc_type = APR_POLL_SOCKET;
- pfd->desc.s = lr->sd;
- apr_pollset_remove(event_pollset, pfd);
- }
+ disable_listensocks(process_slot);
ap_close_listeners();
*closed = 1;
dying = 1;
ap_scoreboard_image->parent[process_slot].quiescing = 1;
for (i = 0; i < threads_per_child; ++i) {
ap_update_child_status_from_indexes(process_slot, i,
- SERVER_DEAD, NULL);
+ SERVER_GRACEFUL, NULL);
}
/* wake up the main thread */
kill(ap_my_pid, SIGTERM);
@@ -918,12 +1031,18 @@ static apr_status_t init_pollset(apr_poo
#endif
ap_listen_rec *lr;
listener_poll_type *pt;
+ int i = 0;
- APR_RING_INIT(&timeout_head, conn_state_t, timeout_list);
- APR_RING_INIT(&keepalive_timeout_head, conn_state_t, timeout_list);
-
- for (lr = ap_listeners; lr != NULL; lr = lr->next) {
- apr_pollfd_t *pfd = apr_palloc(p, sizeof(*pfd));
+ TO_QUEUE_INIT(write_completion_q);
+ TO_QUEUE_INIT(keepalive_q);
+ TO_QUEUE_INIT(linger_q);
+ TO_QUEUE_INIT(short_linger_q);
+
+ listener_pollfd = apr_palloc(p, sizeof(apr_pollfd_t) * num_listensocks);
+ for (lr = ap_listeners; lr != NULL; lr = lr->next, i++) {
+ apr_pollfd_t *pfd;
+ AP_DEBUG_ASSERT(i < num_listensocks);
+ pfd = &listener_pollfd[i];
pt = apr_pcalloc(p, sizeof(*pt));
pfd->desc_type = APR_POLL_SOCKET;
pfd->desc.s = lr->sd;
@@ -970,12 +1089,6 @@ static apr_status_t push2worker(const ap
conn_state_t *cs = (conn_state_t *) pt->baton;
apr_status_t rc;
- if (pt->bypass_push) {
- return APR_SUCCESS;
- }
-
- pt->bypass_push = 1;
-
rc = apr_pollset_remove(pollset, pfd);
/*
@@ -987,7 +1100,8 @@ static apr_status_t push2worker(const ap
if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) {
ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
"pollset remove failed");
- cs->state = CONN_STATE_LINGER;
+ start_lingering_close(cs);
+ return rc;
}
rc = ap_queue_push(worker_queue, cs->pfd.desc.s, cs, cs->p);
@@ -1007,36 +1121,41 @@ static apr_status_t push2worker(const ap
}
/* get_worker:
- * reserve a worker thread, block if all are currently busy.
- * this prevents the worker queue from overflowing and lets
- * other processes accept new connections in the mean time.
+ * If *have_idle_worker_p == 0, reserve a worker thread, and set
+ * *have_idle_worker_p = 1.
+ * If *have_idle_worker_p is already 1, will do nothing.
+ * If blocking == 1, block if all workers are currently busy.
+ * If no worker was available immediately, will set *all_busy to 1.
+ * XXX: If there are no workers, we should not block immediately but
+ * XXX: close all keep-alive connections first.
*/
-static int get_worker(int *have_idle_worker_p)
+static void get_worker(int *have_idle_worker_p, int blocking, int *all_busy)
{
apr_status_t rc;
- if (!*have_idle_worker_p) {
- rc = ap_queue_info_wait_for_idler(worker_queue_info);
-
- if (rc == APR_SUCCESS) {
- *have_idle_worker_p = 1;
- return 1;
- }
- else {
- if (!APR_STATUS_IS_EOF(rc)) {
- ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
- "ap_queue_info_wait_for_idler failed. "
- "Attempting to shutdown process gracefully");
- signal_threads(ST_GRACEFUL);
- }
- return 0;
- }
- }
- else {
+ if (*have_idle_worker_p) {
/* already reserved a worker thread - must have hit a
* transient error on a previous pass
*/
- return 1;
+ return;
+ }
+
+ if (blocking)
+ rc = ap_queue_info_wait_for_idler(worker_queue_info, all_busy);
+ else
+ rc = ap_queue_info_try_get_idler(worker_queue_info);
+
+ if (rc == APR_SUCCESS) {
+ *have_idle_worker_p = 1;
+ }
+ else if (!blocking && rc == APR_EAGAIN) {
+ *all_busy = 1;
+ }
+ else if (!APR_STATUS_IS_EOF(rc)) {
+ ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
+ "ap_queue_info_wait_for_idler failed. "
+ "Attempting to shutdown process gracefully");
+ signal_threads(ST_GRACEFUL);
}
}
@@ -1098,6 +1217,79 @@ static apr_status_t event_register_timed
return APR_SUCCESS;
}
+static void process_lingering_close(conn_state_t *cs, const apr_pollfd_t *pfd)
+{
+ apr_socket_t *csd = ap_get_conn_socket(cs->c);
+ char dummybuf[2048];
+ apr_size_t nbytes;
+ apr_status_t rv;
+ struct timeout_queue *q;
+ q = (cs->state == CONN_STATE_LINGER_SHORT) ? &short_linger_q : &linger_q;
+
+ /* socket is already in non-blocking state */
+ do {
+ nbytes = sizeof(dummybuf);
+ rv = apr_socket_recv(csd, dummybuf, &nbytes);
+ } while (rv == APR_SUCCESS);
+
+ if (!APR_STATUS_IS_EOF(rv)) {
+ return;
+ }
+
+ rv = apr_pollset_remove(event_pollset, pfd);
+ AP_DEBUG_ASSERT(rv == APR_SUCCESS);
+
+ rv = apr_socket_close(csd);
+ AP_DEBUG_ASSERT(rv == APR_SUCCESS);
+
+ apr_thread_mutex_lock(timeout_mutex);
+ TO_QUEUE_REMOVE(*q, cs);
+ apr_thread_mutex_unlock(timeout_mutex);
+ TO_QUEUE_ELEM_INIT(cs);
+
+ apr_pool_clear(cs->p);
+ ap_push_pool(worker_queue_info, cs->p);
+}
+
+/* call 'func' for all elements of 'q' with timeout less than 'timeout_time'.
+ * Pre-condition: timeout_mutex must already be locked
+ * Post-condition: timeout_mutex will be locked again
+ */
+static void process_timeout_queue(struct timeout_queue *q,
+ apr_time_t timeout_time,
+ int (*func)(conn_state_t *))
+{
+ int count = 0;
+ conn_state_t *first, *cs, *last;
+ if (!q->count) {
+ return;
+ }
+ AP_DEBUG_ASSERT(!APR_RING_EMPTY(&q->head, conn_state_t, timeout_list));
+
+ cs = first = APR_RING_FIRST(&q->head);
+ while (cs != APR_RING_SENTINEL(&q->head, conn_state_t, timeout_list)
+ && cs->expiration_time < timeout_time) {
+ last = cs;
+ cs = APR_RING_NEXT(cs, timeout_list);
+ count++;
+ }
+ if (!count)
+ return;
+
+ APR_RING_UNSPLICE(first, last, timeout_list);
+ AP_DEBUG_ASSERT(q->count >= count);
+ q->count -= count;
+ apr_thread_mutex_unlock(timeout_mutex);
+ while (count) {
+ cs = APR_RING_NEXT(first, timeout_list);
+ TO_QUEUE_ELEM_INIT(first);
+ func(first);
+ first = cs;
+ count--;
+ }
+ apr_thread_mutex_lock(timeout_mutex);
+}
+
static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
{
timer_event_t *ep;
@@ -1114,10 +1306,11 @@ static void * APR_THREAD_FUNC listener_t
const apr_pollfd_t *out_pfd;
apr_int32_t num = 0;
apr_interval_time_t timeout_interval;
- apr_time_t timeout_time, now;
+ apr_time_t timeout_time = 0, now, last_log;
listener_poll_type *pt;
- int closed = 0;
+ int closed = 0, listeners_disabled = 0;
+ last_log = apr_time_now();
free(ti);
/* the following times out events that are really close in the future
@@ -1128,6 +1321,9 @@ static void * APR_THREAD_FUNC listener_t
#define TIMEOUT_FUDGE_FACTOR 100000
#define EVENT_FUDGE_FACTOR 10000
+/* XXX: this should be a config options */
+#define WORKER_OVERCOMMIT 2
+
rc = init_pollset(tpool);
if (rc != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
@@ -1144,6 +1340,7 @@ static void * APR_THREAD_FUNC listener_t
apr_signal(LISTENER_SIGNAL, dummy_signal_handler);
for (;;) {
+ int workers_were_busy = 0;
if (listener_may_exit) {
close_listeners(process_slot, &closed);
if (terminate_mode == ST_UNGRACEFUL
@@ -1155,8 +1352,22 @@ static void * APR_THREAD_FUNC listener_t
check_infinite_requests();
}
-
now = apr_time_now();
+ if (APLOGtrace6(ap_server_conf)) {
+ /* trace log status every second */
+ if (now - last_log > apr_time_from_msec(1000)) {
+ last_log = now;
+ apr_thread_mutex_lock(timeout_mutex);
+ ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf,
+ "connections: %d (write-completion: %d "
+ "keep-alive: %d lingering: %d)",
+ connection_count, write_completion_q.count,
+ keepalive_q.count,
+ linger_q.count + short_linger_q.count);
+ apr_thread_mutex_unlock(timeout_mutex);
+ }
+ }
+
apr_thread_mutex_lock(g_timer_ring_mtx);
if (!APR_RING_EMPTY(&timer_ring, timer_event_t, link)) {
te = APR_RING_FIRST(&timer_ring);
@@ -1179,7 +1390,6 @@ static void * APR_THREAD_FUNC listener_t
}
#endif
rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd);
-
if (rc != APR_SUCCESS) {
if (APR_STATUS_IS_EINTR(rc)) {
continue;
@@ -1216,16 +1426,47 @@ static void * APR_THREAD_FUNC listener_t
}
apr_thread_mutex_unlock(g_timer_ring_mtx);
- while (num && get_worker(&have_idle_worker)) {
+ while (num) {
pt = (listener_poll_type *) out_pfd->client_data;
if (pt->type == PT_CSD) {
/* one of the sockets is readable */
+ struct timeout_queue *remove_from_q = &write_completion_q;
+ int blocking = 1;
cs = (conn_state_t *) pt->baton;
switch (cs->state) {
case CONN_STATE_CHECK_REQUEST_LINE_READABLE:
cs->state = CONN_STATE_READ_REQUEST_LINE;
- break;
+ remove_from_q = &keepalive_q;
+ /* don't wait for a worker for a keepalive request */
+ blocking = 0;
+ /* FALL THROUGH */
case CONN_STATE_WRITE_COMPLETION:
+ get_worker(&have_idle_worker, blocking,
+ &workers_were_busy);
+ apr_thread_mutex_lock(timeout_mutex);
+ TO_QUEUE_REMOVE(*remove_from_q, cs);
+ apr_thread_mutex_unlock(timeout_mutex);
+ TO_QUEUE_ELEM_INIT(cs);
+ /* If we didn't get a worker immediately for a keep-alive
+ * request, we close the connection, so that the client can
+ * re-connect to a different process.
+ */
+ if (!have_idle_worker) {
+ start_lingering_close(cs);
+ break;
+ }
+ rc = push2worker(out_pfd, event_pollset);
+ if (rc != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
+ ap_server_conf, "push2worker failed");
+ }
+ else {
+ have_idle_worker = 0;
+ }
+ break;
+ case CONN_STATE_LINGER_NORMAL:
+ case CONN_STATE_LINGER_SHORT:
+ process_lingering_close(cs, out_pfd);
break;
default:
ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
@@ -1234,86 +1475,99 @@ static void * APR_THREAD_FUNC listener_t
cs->state);
AP_DEBUG_ASSERT(0);
}
-
- apr_thread_mutex_lock(timeout_mutex);
- APR_RING_REMOVE(cs, timeout_list);
- apr_thread_mutex_unlock(timeout_mutex);
- APR_RING_ELEM_INIT(cs, timeout_list);
-
- rc = push2worker(out_pfd, event_pollset);
- if (rc != APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
- ap_server_conf, "push2worker failed");
- }
- else {
- have_idle_worker = 0;
- }
}
else if (pt->type == PT_ACCEPT) {
/* A Listener Socket is ready for an accept() */
+ if (workers_were_busy) {
+ if (!listeners_disabled)
+ disable_listensocks(process_slot);
+ listeners_disabled = 1;
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
+ "All workers busy, not accepting new conns"
+ "in this process");
+ }
+ else if (apr_atomic_read32(&connection_count) > threads_per_child
+ + ap_queue_info_get_idlers(worker_queue_info) * WORKER_OVERCOMMIT)
+ {
+ if (!listeners_disabled)
+ disable_listensocks(process_slot);
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
+ "Too many open connections (%u), "
+ "not accepting new conns in this process",
+ apr_atomic_read32(&connection_count));
+ ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
+ "Idle workers: %u",
+ ap_queue_info_get_idlers(worker_queue_info));
+ listeners_disabled = 1;
+ }
+ else if (listeners_disabled) {
+ listeners_disabled = 0;
+ enable_listensocks(process_slot);
+ }
+ if (!listeners_disabled) {
+ lr = (ap_listen_rec *) pt->baton;
+ ap_pop_pool(&ptrans, worker_queue_info);
- lr = (ap_listen_rec *) pt->baton;
-
- ap_pop_pool(&ptrans, worker_queue_info);
-
- if (ptrans == NULL) {
- /* create a new transaction pool for each accepted socket */
- apr_allocator_t *allocator;
-
- apr_allocator_create(&allocator);
- apr_allocator_max_free_set(allocator,
- ap_max_mem_free);
- apr_pool_create_ex(&ptrans, pconf, NULL, allocator);
- apr_allocator_owner_set(allocator, ptrans);
if (ptrans == NULL) {
- ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
- ap_server_conf,
- "Failed to create transaction pool");
- signal_threads(ST_GRACEFUL);
- return NULL;
+ /* create a new transaction pool for each accepted socket */
+ apr_allocator_t *allocator;
+
+ apr_allocator_create(&allocator);
+ apr_allocator_max_free_set(allocator,
+ ap_max_mem_free);
+ apr_pool_create_ex(&ptrans, pconf, NULL, allocator);
+ apr_allocator_owner_set(allocator, ptrans);
+ if (ptrans == NULL) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
+ ap_server_conf,
+ "Failed to create transaction pool");
+ signal_threads(ST_GRACEFUL);
+ return NULL;
+ }
}
- }
- apr_pool_tag(ptrans, "transaction");
+ apr_pool_tag(ptrans, "transaction");
- rc = lr->accept_func(&csd, lr, ptrans);
+ get_worker(&have_idle_worker, 1, &workers_were_busy);
+ rc = lr->accept_func(&csd, lr, ptrans);
- /* later we trash rv and rely on csd to indicate
- * success/failure
- */
- AP_DEBUG_ASSERT(rc == APR_SUCCESS || !csd);
+ /* later we trash rv and rely on csd to indicate
+ * success/failure
+ */
+ AP_DEBUG_ASSERT(rc == APR_SUCCESS || !csd);
- if (rc == APR_EGENERAL) {
- /* E[NM]FILE, ENOMEM, etc */
- resource_shortage = 1;
- signal_threads(ST_GRACEFUL);
- }
+ if (rc == APR_EGENERAL) {
+ /* E[NM]FILE, ENOMEM, etc */
+ resource_shortage = 1;
+ signal_threads(ST_GRACEFUL);
+ }
- if (csd != NULL) {
- rc = ap_queue_push(worker_queue, csd, NULL, ptrans);
- if (rc != APR_SUCCESS) {
- /* trash the connection; we couldn't queue the connected
- * socket to a worker
- */
- apr_socket_close(csd);
- ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
- ap_server_conf,
- "ap_queue_push failed");
- apr_pool_clear(ptrans);
- ap_push_pool(worker_queue_info, ptrans);
+ if (csd != NULL) {
+ rc = ap_queue_push(worker_queue, csd, NULL, ptrans);
+ if (rc != APR_SUCCESS) {
+ /* trash the connection; we couldn't queue the connected
+ * socket to a worker
+ */
+ apr_socket_close(csd);
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
+ ap_server_conf,
+ "ap_queue_push failed");
+ apr_pool_clear(ptrans);
+ ap_push_pool(worker_queue_info, ptrans);
+ }
+ else {
+ have_idle_worker = 0;
+ }
}
else {
- have_idle_worker = 0;
+ apr_pool_clear(ptrans);
+ ap_push_pool(worker_queue_info, ptrans);
}
}
- else {
- apr_pool_clear(ptrans);
- ap_push_pool(worker_queue_info, ptrans);
- }
} /* if:else on pt->type */
#if HAVE_SERF
else if (pt->type == PT_SERF) {
/* send socket to serf. */
- /* XXXX: this doesn't require get_worker(&have_idle_worker) */
+ /* XXXX: this doesn't require get_worker() */
serf_event_trigger(g_serf, pt->baton, out_pfd);
}
#endif
@@ -1325,70 +1579,59 @@ static void * APR_THREAD_FUNC listener_t
* r->request_time for new requests
*/
now = apr_time_now();
+ /* we only do this once per 0.1s (TIMEOUT_FUDGE_FACTOR) */
+ if (now > timeout_time) {
+ struct process_score *ps;
+ timeout_time = now + TIMEOUT_FUDGE_FACTOR;
- /* handle timed out sockets */
- apr_thread_mutex_lock(timeout_mutex);
-
- /* Step 1: keepalive timeouts */
- cs = APR_RING_FIRST(&keepalive_timeout_head);
- timeout_time = now + TIMEOUT_FUDGE_FACTOR;
- while (!APR_RING_EMPTY(&keepalive_timeout_head, conn_state_t, timeout_list)
- && cs->expiration_time < timeout_time) {
-
- cs->state = CONN_STATE_LINGER;
-
- APR_RING_REMOVE(cs, timeout_list);
- apr_thread_mutex_unlock(timeout_mutex);
+ /* handle timed out sockets */
+ apr_thread_mutex_lock(timeout_mutex);
- if (!get_worker(&have_idle_worker)) {
- apr_thread_mutex_lock(timeout_mutex);
- APR_RING_INSERT_HEAD(&keepalive_timeout_head, cs,
- conn_state_t, timeout_list);
- break;
+ /* Step 1: keepalive timeouts */
+ /* If all workers are busy, we kill older keep-alive connections so that they
+ * may connect to another process.
+ */
+ if (workers_were_busy && keepalive_q.count) {
+ ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
+ "All workers are busy, will close %d keep-alive "
+ "connections",
+ keepalive_q.count);
+ process_timeout_queue(&keepalive_q,
+ timeout_time + ap_server_conf->keep_alive_timeout,
+ start_lingering_close);
}
-
- rc = push2worker(&cs->pfd, event_pollset);
-
- if (rc != APR_SUCCESS) {
- return NULL;
- /* XXX return NULL looks wrong - not an init failure
- * that bypasses all the cleanup outside the main loop
- * break seems more like it
- * need to evaluate seriousness of push2worker failures
- */
+ else {
+ process_timeout_queue(&keepalive_q, timeout_time,
+ start_lingering_close);
}
- have_idle_worker = 0;
- apr_thread_mutex_lock(timeout_mutex);
- cs = APR_RING_FIRST(&keepalive_timeout_head);
- }
-
- /* Step 2: write completion timeouts */
- cs = APR_RING_FIRST(&timeout_head);
- while (!APR_RING_EMPTY(&timeout_head, conn_state_t, timeout_list)
- && cs->expiration_time < timeout_time) {
-
- cs->state = CONN_STATE_LINGER;
- APR_RING_REMOVE(cs, timeout_list);
+ /* Step 2: write completion timeouts */
+ process_timeout_queue(&write_completion_q, timeout_time, start_lingering_close);
+ /* Step 3: (normal) lingering close completion timeouts */
+ process_timeout_queue(&linger_q, timeout_time, stop_lingering_close);
+ /* Step 4: (short) lingering close completion timeouts */
+ process_timeout_queue(&short_linger_q, timeout_time, stop_lingering_close);
+
+ ps = ap_get_scoreboard_process(process_slot);
+ ps->write_completion = write_completion_q.count;
+ ps->lingering_close = linger_q.count + short_linger_q.count;
+ ps->keep_alive = keepalive_q.count;
apr_thread_mutex_unlock(timeout_mutex);
- if (!get_worker(&have_idle_worker)) {
- apr_thread_mutex_lock(timeout_mutex);
- APR_RING_INSERT_HEAD(&timeout_head, cs,
- conn_state_t, timeout_list);
- break;
- }
-
- rc = push2worker(&cs->pfd, event_pollset);
- if (rc != APR_SUCCESS) {
- return NULL;
- }
- have_idle_worker = 0;
- apr_thread_mutex_lock(timeout_mutex);
- cs = APR_RING_FIRST(&timeout_head);
+ ps->connections = apr_atomic_read32(&connection_count);
+ /* XXX: should count CONN_STATE_SUSPENDED and set ps->suspended */
}
-
- apr_thread_mutex_unlock(timeout_mutex);
-
+ if (listeners_disabled && !workers_were_busy &&
+ (int)apr_atomic_read32(&connection_count) <
+ ((int)ap_queue_info_get_idlers(worker_queue_info) - 1) * WORKER_OVERCOMMIT +
+ threads_per_child)
+ {
+ listeners_disabled = 0;
+ enable_listensocks(process_slot);
+ }
+ /*
+ * XXX: do we need to set some timeout that re-enables the listensocks
+ * XXX: in case no other event occurs?
+ */
} /* listener main loop */
close_listeners(process_slot, &closed);
@@ -1439,7 +1682,7 @@ static void *APR_THREAD_FUNC worker_thre
}
ap_update_child_status_from_indexes(process_slot, thread_slot,
- dying ? SERVER_DEAD : SERVER_READY, NULL);
+ dying ? SERVER_GRACEFUL : SERVER_READY, NULL);
worker_pop:
if (workers_may_exit) {
break;
@@ -1598,7 +1841,10 @@ static void *APR_THREAD_FUNC start_threa
/* Create the main pollset */
rv = apr_pollset_create(&event_pollset,
- threads_per_child,
+ threads_per_child, /* XXX don't we need more, to handle
+ * connections in K-A or lingering
+ * close?
+ */
pchild, APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY);
if (rv != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf,
@@ -1973,6 +2219,7 @@ static int make_child(server_rec * s, in
event_note_child_lost_slot(slot, pid);
}
ap_scoreboard_image->parent[slot].quiescing = 0;
+ ap_scoreboard_image->parent[slot].not_accepting = 0;
event_note_child_started(slot, pid);
return 0;
}
@@ -2048,8 +2295,9 @@ static void perform_idle_server_maintena
*/
if (ps->pid != 0) { /* XXX just set all_dead_threads in outer
for loop if no pid? not much else matters */
- if (status <= SERVER_READY &&
- !ps->quiescing && ps->generation == retained->my_generation)
{
+ if (status <= SERVER_READY && !ps->quiescing && !ps->not_accepting
+ && ps->generation == retained->my_generation)
+ {
++idle_thread_count;
}
if (status >= SERVER_READY && status < SERVER_GRACEFUL) {
Modified: httpd/httpd/trunk/server/mpm/event/fdqueue.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/server/mpm/event/fdqueue.c?rev=1137358&r1=1137357&r2=1137358&view=diff
==============================================================================
--- httpd/httpd/trunk/server/mpm/event/fdqueue.c (original)
+++ httpd/httpd/trunk/server/mpm/event/fdqueue.c Sun Jun 19 12:23:42 2011
@@ -127,7 +127,19 @@ apr_status_t ap_queue_info_set_idle(fd_q
return APR_SUCCESS;
}
-apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info)
+apr_status_t ap_queue_info_try_get_idler(fd_queue_info_t * queue_info)
+{
+ int prev_idlers;
+ prev_idlers = apr_atomic_dec32((apr_uint32_t *)&(queue_info->idlers));
+ if (prev_idlers <= 0) {
+ apr_atomic_inc32((apr_uint32_t *)&(queue_info->idlers)); /* back out dec
*/
+ return APR_EAGAIN;
+ }
+ return APR_SUCCESS;
+}
+
+apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info,
+ int *had_to_block)
{
apr_status_t rv;
int prev_idlers;
@@ -165,6 +177,7 @@ apr_status_t ap_queue_info_wait_for_idle
* threads are waiting on an idle worker.
*/
if (queue_info->idlers < 0) {
+ *had_to_block = 1;
rv = apr_thread_cond_wait(queue_info->wait_for_idler,
queue_info->idlers_mutex);
if (rv != APR_SUCCESS) {
@@ -191,6 +204,14 @@ apr_status_t ap_queue_info_wait_for_idle
}
}
+apr_uint32_t ap_queue_info_get_idlers(fd_queue_info_t * queue_info)
+{
+ apr_int32_t val;
+ val = (apr_int32_t)apr_atomic_read32((apr_uint32_t *)&queue_info->idlers);
+ if (val < 0)
+ return 0;
+ return val;
+}
void ap_push_pool(fd_queue_info_t * queue_info,
apr_pool_t * pool_to_recycle)
Modified: httpd/httpd/trunk/server/mpm/event/fdqueue.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/server/mpm/event/fdqueue.h?rev=1137358&r1=1137357&r2=1137358&view=diff
==============================================================================
--- httpd/httpd/trunk/server/mpm/event/fdqueue.h (original)
+++ httpd/httpd/trunk/server/mpm/event/fdqueue.h Sun Jun 19 12:23:42 2011
@@ -46,8 +46,11 @@ apr_status_t ap_queue_info_create(fd_que
int max_recycled_pools);
apr_status_t ap_queue_info_set_idle(fd_queue_info_t * queue_info,
apr_pool_t * pool_to_recycle);
-apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info);
+apr_status_t ap_queue_info_try_get_idler(fd_queue_info_t * queue_info);
+apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info,
+ int *had_to_block);
apr_status_t ap_queue_info_term(fd_queue_info_t * queue_info);
+apr_uint32_t ap_queue_info_get_idlers(fd_queue_info_t * queue_info);
struct fd_queue_elem_t
{
|