Return-Path: X-Original-To: apmail-httpd-dev-archive@www.apache.org Delivered-To: apmail-httpd-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D6DC01825F for ; Sun, 27 Sep 2015 18:41:31 +0000 (UTC) Received: (qmail 19838 invoked by uid 500); 27 Sep 2015 18:41:28 -0000 Delivered-To: apmail-httpd-dev-archive@httpd.apache.org Received: (qmail 19773 invoked by uid 500); 27 Sep 2015 18:41:28 -0000 Mailing-List: contact dev-help@httpd.apache.org; run by ezmlm Precedence: bulk Reply-To: dev@httpd.apache.org list-help: list-unsubscribe: List-Post: List-Id: Delivered-To: mailing list dev@httpd.apache.org Received: (qmail 19763 invoked by uid 99); 27 Sep 2015 18:41:28 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 27 Sep 2015 18:41:28 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id EE4DEC0959 for ; Sun, 27 Sep 2015 18:41:27 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.007 X-Spam-Level: X-Spam-Status: No, score=-0.007 tagged_above=-999 required=6.31 tests=[RP_MATCHES_RCVD=-0.006, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id GCxzBfWIWEh9 for ; Sun, 27 Sep 2015 18:41:26 +0000 (UTC) Received: from monica.sharp.fm (monica.sharp.fm [80.168.143.5]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTP id EF61D42BB4 for ; Sun, 27 Sep 2015 18:41:25 +0000 (UTC) Received: from [192.168.220.248] (unknown [192.168.220.248]) (using TLSv1 with cipher DHE-RSA-AES256-SHA (256/256 bits)) (Client did not present a certificate) (Authenticated sender: minfrin@sharp.fm) by monica.sharp.fm (Postfix) with ESMTPSA id 27B6E830EF for ; Sun, 27 Sep 2015 19:41:25 +0100 (BST) From: Graham Leggett Content-Type: multipart/mixed; boundary="Apple-Mail=_6A4BE87B-F14F-4446-8C54-1371910991FA" Message-Id: <38C56976-E693-4EEE-88BA-85DE3328FED3@sharp.fm> Mime-Version: 1.0 (Mac OS X Mail 8.2 \(2104\)) Subject: Re: [Patch] Async write completion for the full connection filter stack Date: Sun, 27 Sep 2015 20:41:24 +0200 References: <715A3AE8-8015-4537-943F-A34E0E2A76F7@sharp.fm> <540DFB30.6050700@apache.org> <2F3B060C-E383-407F-A24C-7F026AD3CA81@sharp.fm> <7A74676C-AA9D-40E1-A434-9280C297217D@sharp.fm> <54128B11.9080803@apache.org> <496E8EFA-7A10-4D91-8C7E-434A9A3B09C5@sharp.fm> <541499A9.5050408@apache.org> <4D9E72F8-4918-49AD-8DC4-E3594C9ED43D@sharp.fm> <04776F50-0A53-4C1D-AA1C-9F57B60BC0AC@sharp.fm> To: dev@httpd.apache.org In-Reply-To: <04776F50-0A53-4C1D-AA1C-9F57B60BC0AC@sharp.fm> X-Mailer: Apple Mail (2.2104) --Apple-Mail=_6A4BE87B-F14F-4446-8C54-1371910991FA Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=windows-1252 Hi all, I think I have cracked the async problem for both request and connection = output filters with this patch. It provides three new functions: - ap_filter_reinstate_brigade() - Used at the start of a filter, = brigades that were set aside earlier are reinstated for sending. - ap_filter_should_yield() - Returns true if downstream filters have set = aside data. A filter would typically respond by setting aside the data = it is working with and returning in the expectation of being called = again. - ap_filter_setaside_brigade() - Used at the end of a filter, any = brigades that were not processed can be set aside to continue the job = when called later. The magic happens in the MPM itself. The first time = ap_filter_setaside_brigade() is called the filter is added to a = hashtable and a cleanup is registered to have the filter removed when = the request and/or connection is cleared. The MPM iterates through this = hashtable, and sends empty brigades to any filter with setaside data. Key to this technique is that existing filters remain unaffected - in = the absence of any changes to a filter the whole brigade will be = processed and sent downstream, and existing behaviour is maintained. = Same with FLUSH buckets - as expected, flush behaviour remains = unchanged. If however the filters in the chain are able to setaside buckets, they = can defer themselves to the write completion phase which in turn can = take full advantage of the event MPM. These filters will be expected to = handle an empty brigade to =93kick=94 them back into life and continue = the writing of their setaside data. As soon as their setaside brigade = becomes empty the kicks then stop. All filters with setaside data get = kicked exactly once, so none of the filters should get starved. Filters = that are removed from the stack get their setaside emptied, and so = become ineligible for kicks. When the pool cleanup gets triggered, the = filter is permanently removed from the connection and disappears. Over and above the network filter the first filter to be modified is = mod_ssl, and this will allow files served over SSL to take advantage of = write completion. Another filter that will benefit from using the above = calls is mod_deflate. I have an additional goal of adding an ap_filter_suspend() method that = will allow us to suspend a filter for a given period of time or until = some callback is triggered, but that will be a separate patch. Regards, Graham -- --Apple-Mail=_6A4BE87B-F14F-4446-8C54-1371910991FA Content-Disposition: attachment; filename=httpd-async-fullstack-ssl6.patch Content-Type: application/octet-stream; name="httpd-async-fullstack-ssl6.patch" Content-Transfer-Encoding: 7bit Index: include/httpd.h =================================================================== --- include/httpd.h (revision 1705548) +++ include/httpd.h (working copy) @@ -55,6 +55,7 @@ #include "apr_buckets.h" #include "apr_poll.h" #include "apr_thread_proc.h" +#include "apr_hash.h" #include "os.h" @@ -1191,6 +1192,12 @@ /** Array of requests being handled under this connection. */ apr_array_header_t *requests; + + /** Empty bucket brigade */ + apr_bucket_brigade *empty; + + /** Hashtable of filters with setaside buckets for write completion */ + apr_hash_t *filters; }; struct conn_slave_rec { Index: include/util_filter.h =================================================================== --- include/util_filter.h (revision 1705548) +++ include/util_filter.h (working copy) @@ -278,6 +278,13 @@ * to the request_rec, except that it is used for connection filters. */ conn_rec *c; + + /** Buffered data associated with the current filter. */ + apr_bucket_brigade *bb; + + /** Dedicated pool to use for deferred writes. */ + apr_pool_t *deferred_pool; + }; /** @@ -519,8 +526,11 @@ */ /** - * prepare a bucket brigade to be setaside. If a different brigade was + * Prepare a bucket brigade to be setaside. If a different brigade was * set-aside earlier, then the two brigades are concatenated together. + * + * If *save_to is NULL, the brigade will be created, and a cleanup registered + * to clear the brigade address when the pool is destroyed. * @param f The current filter * @param save_to The brigade that was previously set-aside. Regardless, the * new bucket brigade is returned in this location. @@ -533,6 +543,56 @@ apr_bucket_brigade **b, apr_pool_t *p); /** + * Prepare a bucket brigade to be setaside, creating a dedicated pool within + * the filter to handle the lifetime of the setaside brigade. If a non-empty + * amount of data was set aside, the counter indicating whether data exists + * in the output chain is incremented. + * @param f The current filter + * @param bb The bucket brigade to set aside. This brigade is always empty + * on return + */ +AP_DECLARE(apr_status_t) ap_filter_setaside_brigade(ap_filter_t *f, + apr_bucket_brigade *bb); + +/** + * Reinstate a brigade setaside earlier, and calculate the amount of data we + * should write based on the presence of flush buckets, size limits on in + * memory buckets, and the number of outstanding requests in the pipeline. If + * data was previously set aside, the counter indicating whether data exists + * in the output chain is decremented. + * + * If the brigade passed in is empty, we reinstate the brigade and return + * immediately on the assumption that any buckets needing to be flushed were + * flushed before being passed to ap_filter_setaside_brigade(). + * + * @param f The current filter + * @param bb The bucket brigade to restore to. + * @param flush_upto Work out the bucket we need to flush up to, based on the + * presence of a flush bucket, size limits on in-memory + * buckets, size limits on the number of requests outstanding + * in the pipeline. + * @return If positive, there are at least THRESHOLD_MIN_WRITE bytes to write + * past the bucket marked by flush_upto, or if negative, the original + * brigade to write was empty. If zero, there is less than + * THRESHOLD_MIN_WRITE bytes to write, and the caller might choose to + * wait for more data to arrive before writing. + */ +AP_DECLARE(int) ap_filter_reinstate_brigade(ap_filter_t *f, + apr_bucket_brigade *bb, + apr_bucket **flush_upto); + +/** + * This function calculates whether there are any as yet unsent + * buffered brigades in downstream filters, and returns non zero + * if so. + * + * A filter can use this to determine whether the passing of data + * downstream might block, and so defer the passing of brigades + * downstream. + */ +AP_DECLARE(int) ap_filter_should_yield(ap_filter_t *f); + +/** * Flush function for apr_brigade_* calls. This calls ap_pass_brigade * to flush the brigade if the brigade buffer overflows. * @param bb The brigade to flush Index: modules/ssl/ssl_engine_io.c =================================================================== --- modules/ssl/ssl_engine_io.c (revision 1705548) +++ modules/ssl/ssl_engine_io.c (working copy) @@ -1679,20 +1679,24 @@ bio_filter_in_ctx_t *inctx; bio_filter_out_ctx_t *outctx; apr_read_type_e rblock = APR_NONBLOCK_READ; + apr_bucket *flush_upto = NULL; if (f->c->aborted) { apr_brigade_cleanup(bb); return APR_ECONNABORTED; } + inctx = (bio_filter_in_ctx_t *)filter_ctx->pbioRead->ptr; + outctx = (bio_filter_out_ctx_t *)filter_ctx->pbioWrite->ptr; + + /* Reinstate any buffered content */ + ap_filter_reinstate_brigade(f, bb, &flush_upto); + if (!filter_ctx->pssl) { /* ssl_filter_io_shutdown was called */ return ap_pass_brigade(f->next, bb); } - inctx = (bio_filter_in_ctx_t *)filter_ctx->pbioRead->ptr; - outctx = (bio_filter_out_ctx_t *)filter_ctx->pbioWrite->ptr; - /* When we are the writer, we must initialize the inctx * mode so that we block for any required ssl input, because * output filtering is always nonblocking. @@ -1707,6 +1711,16 @@ while (!APR_BRIGADE_EMPTY(bb)) { apr_bucket *bucket = APR_BRIGADE_FIRST(bb); + /* if the core has set aside data, back off and try later */ + if (!flush_upto) { + if (ap_filter_should_yield(f)) { + break; + } + } + else if (flush_upto == bucket) { + flush_upto = NULL; + } + /* If it is a flush or EOS/EOR, we need to pass this down. * These types do not require translation by OpenSSL. */ @@ -1774,8 +1788,13 @@ break; } } + } + if (APR_STATUS_IS_EOF(status) || (status == APR_SUCCESS)) { + return ap_filter_setaside_brigade(f, bb); + } + return status; } Index: server/core.c =================================================================== --- server/core.c (revision 1705548) +++ server/core.c (working copy) @@ -5007,6 +5007,8 @@ c->id = id; c->bucket_alloc = alloc; + c->empty = apr_brigade_create(c->pool, c->bucket_alloc); + c->filters = apr_hash_make(c->pool); c->clogging_input_filters = 0; Index: server/core_filters.c =================================================================== --- server/core_filters.c (revision 1705548) +++ server/core_filters.c (working copy) @@ -78,9 +78,7 @@ #define APLOG_MODULE_INDEX AP_CORE_MODULE_INDEX struct core_output_filter_ctx { - apr_bucket_brigade *buffered_bb; apr_bucket_brigade *tmp_flush_bb; - apr_pool_t *deferred_write_pool; apr_size_t bytes_written; }; @@ -328,11 +326,6 @@ return APR_SUCCESS; } -static void setaside_remaining_output(ap_filter_t *f, - core_output_filter_ctx_t *ctx, - apr_bucket_brigade *bb, - conn_rec *c); - static apr_status_t send_brigade_nonblocking(apr_socket_t *s, apr_bucket_brigade *bb, apr_size_t *bytes_written, @@ -358,33 +351,24 @@ conn_rec *c); #endif -/* XXX: Should these be configurable parameters? */ -#define THRESHOLD_MIN_WRITE 4096 -#define THRESHOLD_MAX_BUFFER 65536 -#define MAX_REQUESTS_IN_PIPELINE 5 - /* Optional function coming from mod_logio, used for logging of output * traffic */ extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *ap__logio_add_bytes_out; -apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb) +apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *bb) { conn_rec *c = f->c; core_net_rec *net = f->ctx; core_output_filter_ctx_t *ctx = net->out_ctx; - apr_bucket_brigade *bb = NULL; - apr_bucket *bucket, *next, *flush_upto = NULL; - apr_size_t bytes_in_brigade, non_file_bytes_in_brigade; - int eor_buckets_in_brigade, morphing_bucket_in_brigade; + apr_bucket *flush_upto = NULL; + int should_write; apr_status_t rv; int loglevel = ap_get_conn_module_loglevel(c, APLOG_MODULE_INDEX); /* Fail quickly if the connection has already been aborted. */ if (c->aborted) { - if (new_bb != NULL) { - apr_brigade_cleanup(new_bb); - } + apr_brigade_cleanup(bb); return APR_ECONNABORTED; } @@ -397,33 +381,14 @@ * allocated from bb->pool which might be wrong. */ ctx->tmp_flush_bb = apr_brigade_create(c->pool, c->bucket_alloc); - /* same for buffered_bb and ap_save_brigade */ - ctx->buffered_bb = apr_brigade_create(c->pool, c->bucket_alloc); } - if (new_bb != NULL) - bb = new_bb; - - if ((ctx->buffered_bb != NULL) && - !APR_BRIGADE_EMPTY(ctx->buffered_bb)) { - if (new_bb != NULL) { - APR_BRIGADE_PREPEND(bb, ctx->buffered_bb); - } - else { - bb = ctx->buffered_bb; - } - c->data_in_output_filters = 0; - } - else if (new_bb == NULL) { - return APR_SUCCESS; - } - /* Scan through the brigade and decide whether to attempt a write, * and how much to write, based on the following rules: * - * 1) The new_bb is null: Do a nonblocking write of as much as + * 1) The bb is empty: Do a nonblocking write of as much as * possible: do a nonblocking write of as much data as possible, - * then save the rest in ctx->buffered_bb. (If new_bb == NULL, + * then save the rest in ctx->buffered_bb. (If bb is empty, * it probably means that the MPM is doing asynchronous write * completion and has just determined that this connection * is writable.) @@ -465,87 +430,12 @@ * then save the rest in ctx->buffered_bb. */ - if (new_bb == NULL) { - rv = send_brigade_nonblocking(net->client_socket, bb, - &(ctx->bytes_written), c); - if (rv != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(rv)) { - /* The client has aborted the connection */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c, - "core_output_filter: writing data to the network"); - apr_brigade_cleanup(bb); - c->aborted = 1; - return rv; - } - setaside_remaining_output(f, ctx, bb, c); + should_write = ap_filter_reinstate_brigade(f, bb, &flush_upto); + + if (APR_BRIGADE_EMPTY(bb)) { return APR_SUCCESS; } - bytes_in_brigade = 0; - non_file_bytes_in_brigade = 0; - eor_buckets_in_brigade = 0; - morphing_bucket_in_brigade = 0; - - for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb); - bucket = next) { - next = APR_BUCKET_NEXT(bucket); - - if (!APR_BUCKET_IS_METADATA(bucket)) { - if (bucket->length == (apr_size_t)-1) { - /* - * A setaside of morphing buckets would read everything into - * memory. Instead, we will flush everything up to and - * including this bucket. - */ - morphing_bucket_in_brigade = 1; - } - else { - bytes_in_brigade += bucket->length; - if (!APR_BUCKET_IS_FILE(bucket)) - non_file_bytes_in_brigade += bucket->length; - } - } - else if (AP_BUCKET_IS_EOR(bucket)) { - eor_buckets_in_brigade++; - } - - if (APR_BUCKET_IS_FLUSH(bucket) - || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER - || morphing_bucket_in_brigade - || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) { - /* this segment of the brigade MUST be sent before returning. */ - - if (loglevel >= APLOG_TRACE6) { - char *reason = APR_BUCKET_IS_FLUSH(bucket) ? - "FLUSH bucket" : - (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ? - "THRESHOLD_MAX_BUFFER" : - morphing_bucket_in_brigade ? "morphing bucket" : - "MAX_REQUESTS_IN_PIPELINE"; - ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, c, - "will flush because of %s", reason); - ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c, - "seen in brigade%s: bytes: %" APR_SIZE_T_FMT - ", non-file bytes: %" APR_SIZE_T_FMT ", eor " - "buckets: %d, morphing buckets: %d", - flush_upto == NULL ? " so far" - : " since last flush point", - bytes_in_brigade, - non_file_bytes_in_brigade, - eor_buckets_in_brigade, - morphing_bucket_in_brigade); - } - /* - * Defer the actual blocking write to avoid doing many writes. - */ - flush_upto = next; - - bytes_in_brigade = 0; - non_file_bytes_in_brigade = 0; - eor_buckets_in_brigade = 0; - morphing_bucket_in_brigade = 0; - } - } - if (flush_upto != NULL) { ctx->tmp_flush_bb = apr_brigade_split_ex(bb, flush_upto, ctx->tmp_flush_bb); @@ -571,16 +461,7 @@ APR_BRIGADE_CONCAT(bb, ctx->tmp_flush_bb); } - if (loglevel >= APLOG_TRACE8) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c, - "brigade contains: bytes: %" APR_SIZE_T_FMT - ", non-file bytes: %" APR_SIZE_T_FMT - ", eor buckets: %d, morphing buckets: %d", - bytes_in_brigade, non_file_bytes_in_brigade, - eor_buckets_in_brigade, morphing_bucket_in_brigade); - } - - if (bytes_in_brigade >= THRESHOLD_MIN_WRITE) { + if (should_write) { rv = send_brigade_nonblocking(net->client_socket, bb, &(ctx->bytes_written), c); if ((rv != APR_SUCCESS) && (!APR_STATUS_IS_EAGAIN(rv))) { @@ -599,43 +480,12 @@ } } - setaside_remaining_output(f, ctx, bb, c); + remove_empty_buckets(bb); + ap_filter_setaside_brigade(f, bb); + return APR_SUCCESS; } -/* - * This function assumes that either ctx->buffered_bb == NULL, or - * ctx->buffered_bb is empty, or ctx->buffered_bb == bb - */ -static void setaside_remaining_output(ap_filter_t *f, - core_output_filter_ctx_t *ctx, - apr_bucket_brigade *bb, - conn_rec *c) -{ - if (bb == NULL) { - return; - } - remove_empty_buckets(bb); - if (!APR_BRIGADE_EMPTY(bb)) { - c->data_in_output_filters = 1; - if (bb != ctx->buffered_bb) { - if (!ctx->deferred_write_pool) { - apr_pool_create(&ctx->deferred_write_pool, c->pool); - apr_pool_tag(ctx->deferred_write_pool, "deferred_write"); - } - ap_save_brigade(f, &(ctx->buffered_bb), &bb, - ctx->deferred_write_pool); - } - } - else if (ctx->deferred_write_pool) { - /* - * There are no more requests in the pipeline. We can just clear the - * pool. - */ - apr_pool_clear(ctx->deferred_write_pool); - } -} - #ifndef APR_MAX_IOVEC_SIZE #define MAX_IOVEC_TO_WRITE 16 #else Index: server/mpm/event/event.c =================================================================== --- server/mpm/event/event.c (revision 1705548) +++ server/mpm/event/event.c (working copy) @@ -1147,16 +1147,29 @@ } if (cs->pub.state == CONN_STATE_WRITE_COMPLETION) { - ap_filter_t *output_filter = c->output_filters; - apr_status_t rv; + apr_hash_index_t *rindex; + apr_status_t rv = APR_SUCCESS; ap_update_child_status_from_conn(sbh, SERVER_BUSY_WRITE, c); - while (output_filter->next != NULL) { - output_filter = output_filter->next; + + rindex = apr_hash_first(NULL, c->filters); + while (rindex) { + ap_filter_t *f = apr_hash_this_val(rindex); + + if (f->bb && !APR_BRIGADE_EMPTY(f->bb)) { + rv = ap_pass_brigade(f, c->empty); + apr_brigade_cleanup(c->empty); + if (APR_SUCCESS != rv) { + ap_log_cerror( + APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(00470) + "write failure in '%s' output filter", f->frec->name); + break; + } + } + + rindex = apr_hash_next(rindex); } - rv = output_filter->frec->filter_func.out_func(output_filter, NULL); + if (rv != APR_SUCCESS) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(00470) - "network write failure in core output filter"); cs->pub.state = CONN_STATE_LINGER; } else if (c->data_in_output_filters) { Index: server/mpm/motorz/motorz.c =================================================================== --- server/mpm/motorz/motorz.c (revision 1705548) +++ server/mpm/motorz/motorz.c (working copy) @@ -364,18 +364,29 @@ } if (scon->cs.state == CONN_STATE_WRITE_COMPLETION) { - ap_filter_t *output_filter = c->output_filters; + apr_hash_index_t *rindex; ap_update_child_status_from_conn(scon->sbh, SERVER_BUSY_WRITE, c); - while (output_filter->next != NULL) { - output_filter = output_filter->next; + + rv = APR_SUCCESS; + rindex = apr_hash_first(NULL, c->filters); + while (rindex) { + ap_filter_t *f = apr_hash_this_val(rindex); + + if (f->bb && !APR_BRIGADE_EMPTY(f->bb)) { + rv = ap_pass_brigade(f, c->empty); + apr_brigade_cleanup(c->empty); + if (APR_SUCCESS != rv) { + ap_log_cerror( + APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(02848) + "write failure in '%s' output filter", f->frec->name); + break; + } + } + + rindex = apr_hash_next(rindex); } - rv = output_filter->frec->filter_func.out_func(output_filter, - NULL); - if (rv != APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_WARNING, rv, ap_server_conf, APLOGNO(02848) - "network write failure in core output filter"); scon->cs.state = CONN_STATE_LINGER; } else if (c->data_in_output_filters) { Index: server/mpm/simple/simple_io.c =================================================================== --- server/mpm/simple/simple_io.c (revision 1705548) +++ server/mpm/simple/simple_io.c (working copy) @@ -92,14 +92,27 @@ } if (scon->cs.state == CONN_STATE_WRITE_COMPLETION) { - ap_filter_t *output_filter = c->output_filters; - while (output_filter->next != NULL) { - output_filter = output_filter->next; + apr_hash_index_t *rindex; + + rv = APR_SUCCESS; + rindex = apr_hash_first(NULL, c->filters); + while (rindex) { + ap_filter_t *f = apr_hash_this_val(rindex); + + if (f->bb && !APR_BRIGADE_EMPTY(f->bb)) { + rv = ap_pass_brigade(f, c->empty); + apr_brigade_cleanup(c->empty); + if (APR_SUCCESS != rv) { + ap_log_cerror( + APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(02848) + "write failure in '%s' output filter", f->frec->name); + break; + } + } + + rindex = apr_hash_next(rindex); } - rv = output_filter->frec->filter_func.out_func(output_filter, - NULL); - if (rv != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_WARNING, rv, ap_server_conf, APLOGNO(00249) "network write failure in core output filter"); Index: server/util_filter.c =================================================================== --- server/util_filter.c (revision 1705548) +++ server/util_filter.c (working copy) @@ -24,6 +24,7 @@ #include "http_config.h" #include "http_core.h" #include "http_log.h" +#include "http_request.h" #include "util_filter.h" /* NOTE: Apache's current design doesn't allow a pool to be passed thru, @@ -32,6 +33,11 @@ #define FILTER_POOL apr_hook_global_pool #include "ap_hooks.h" /* for apr_hook_global_pool */ +/* XXX: Should these be configurable parameters? */ +#define THRESHOLD_MIN_WRITE 4096 +#define THRESHOLD_MAX_BUFFER 65536 +#define MAX_REQUESTS_IN_PIPELINE 5 + /* ** This macro returns true/false if a given filter should be inserted BEFORE ** another filter. This will happen when one of: 1) there isn't another @@ -319,6 +325,8 @@ f->r = frec->ftype < AP_FTYPE_CONNECTION ? r : NULL; f->c = c; f->next = NULL; + f->bb = NULL; + f->deferred_pool = NULL; if (INSERT_BEFORE(f, *outf)) { f->next = *outf; @@ -474,6 +482,17 @@ AP_DECLARE(void) ap_remove_output_filter(ap_filter_t *f) { + + if ((f->bb) && !APR_BRIGADE_EMPTY(f->bb)) { + apr_brigade_cleanup(f->bb); + f->c->data_in_output_filters--; + } + + if (f->deferred_pool) { + apr_pool_destroy(f->deferred_pool); + f->deferred_pool = NULL; + } + remove_any_filter(f, f->r ? &f->r->output_filters : NULL, f->r ? &f->r->proto_output_filters : NULL, &f->c->output_filters); @@ -566,6 +585,7 @@ { if (next) { apr_bucket *e; + if ((e = APR_BRIGADE_LAST(bb)) && APR_BUCKET_IS_EOS(e) && next->r) { /* This is only safe because HTTP_HEADER filter is always in * the filter stack. This ensures that there is ALWAYS a @@ -635,7 +655,8 @@ apr_status_t rv, srv = APR_SUCCESS; /* If have never stored any data in the filter, then we had better - * create an empty bucket brigade so that we can concat. + * create an empty bucket brigade so that we can concat. Register + * a cleanup to zero out the pointer if the pool is cleared. */ if (!(*saveto)) { *saveto = apr_brigade_create(p, f->c->bucket_alloc); @@ -673,6 +694,190 @@ return srv; } +static apr_status_t filters_cleanup(void *data) +{ + ap_filter_t **key = data; + + apr_hash_set((*key)->c->filters, key, sizeof(ap_filter_t *), NULL); + + return APR_SUCCESS; +} + +AP_DECLARE(apr_status_t) ap_filter_setaside_brigade(ap_filter_t *f, + apr_bucket_brigade *bb) +{ + if (!APR_BRIGADE_EMPTY(bb)) { + if (!f->bb) { + ap_filter_t **key = apr_palloc(f->r ? f->r->pool : f->c->pool, + sizeof(ap_filter_t *)); + *key = f; + apr_hash_set(f->c->filters, key, sizeof(ap_filter_t *), f); + apr_pool_cleanup_register(f->r ? f->r->pool : f->c->pool, key, + filters_cleanup, apr_pool_cleanup_null); + f->bb = apr_brigade_create(f->r ? f->r->pool : f->c->pool, + f->c->bucket_alloc); + } + if (APR_BRIGADE_EMPTY((f->bb))) { + f->c->data_in_output_filters++; + } + if (bb != f->bb) { + if (!(f->r) && !(f->deferred_pool)) { + apr_pool_create(&f->deferred_pool, f->c->pool); + apr_pool_tag(f->deferred_pool, "deferred_pool"); + } + return ap_save_brigade(f, &f->bb, &bb, + f->r ? f->r->pool : f->deferred_pool); + } + } + else if (f->deferred_pool) { + /* + * There are no more requests in the pipeline. We can just clear the + * pool. + */ + apr_pool_clear(f->deferred_pool); + } + return APR_SUCCESS; +} + +AP_DECLARE(int) ap_filter_reinstate_brigade(ap_filter_t *f, + apr_bucket_brigade *bb, + apr_bucket **flush_upto) +{ + apr_bucket *bucket, *next; + apr_size_t bytes_in_brigade, non_file_bytes_in_brigade; + int eor_buckets_in_brigade, morphing_bucket_in_brigade; + int loglevel = ap_get_conn_module_loglevel(f->c, APLOG_MODULE_INDEX); + int empty = APR_BRIGADE_EMPTY(bb); + + if (f->bb && !APR_BRIGADE_EMPTY(f->bb)) { + APR_BRIGADE_PREPEND(bb, f->bb); + f->c->data_in_output_filters--; + } + + /* + * Determine if and up to which bucket we need to do a blocking write: + * + * a) The brigade contains a flush bucket: Do a blocking write + * of everything up that point. + * + * b) The request is in CONN_STATE_HANDLER state, and the brigade + * contains at least THRESHOLD_MAX_BUFFER bytes in non-file + * buckets: Do blocking writes until the amount of data in the + * buffer is less than THRESHOLD_MAX_BUFFER. (The point of this + * rule is to provide flow control, in case a handler is + * streaming out lots of data faster than the data can be + * sent to the client.) + * + * c) The request is in CONN_STATE_HANDLER state, and the brigade + * contains at least MAX_REQUESTS_IN_PIPELINE EOR buckets: + * Do blocking writes until less than MAX_REQUESTS_IN_PIPELINE EOR + * buckets are left. (The point of this rule is to prevent too many + * FDs being kept open by pipelined requests, possibly allowing a + * DoS). + * + * d) The brigade contains a morphing bucket: If there was no other + * reason to do a blocking write yet, try reading the bucket. If its + * contents fit into memory before THRESHOLD_MAX_BUFFER is reached, + * everything is fine. Otherwise we need to do a blocking write the + * up to and including the morphing bucket, because ap_save_brigade() + * would read the whole bucket into memory later on. + */ + + *flush_upto = NULL; + + if (empty) { + return -1; + } + + bytes_in_brigade = 0; + non_file_bytes_in_brigade = 0; + eor_buckets_in_brigade = 0; + morphing_bucket_in_brigade = 0; + + for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb); + bucket = next) { + next = APR_BUCKET_NEXT(bucket); + + if (!APR_BUCKET_IS_METADATA(bucket)) { + if (bucket->length == (apr_size_t)-1) { + /* + * A setaside of morphing buckets would read everything into + * memory. Instead, we will flush everything up to and + * including this bucket. + */ + morphing_bucket_in_brigade = 1; + } + else { + bytes_in_brigade += bucket->length; + if (!APR_BUCKET_IS_FILE(bucket)) + non_file_bytes_in_brigade += bucket->length; + } + } + else if (AP_BUCKET_IS_EOR(bucket)) { + eor_buckets_in_brigade++; + } + + if (APR_BUCKET_IS_FLUSH(bucket) + || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER + || morphing_bucket_in_brigade + || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) { + /* this segment of the brigade MUST be sent before returning. */ + + if (loglevel >= APLOG_TRACE6) { + char *reason = APR_BUCKET_IS_FLUSH(bucket) ? + "FLUSH bucket" : + (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ? + "THRESHOLD_MAX_BUFFER" : + morphing_bucket_in_brigade ? "morphing bucket" : + "MAX_REQUESTS_IN_PIPELINE"; + ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, f->c, + "will flush because of %s", reason); + ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, f->c, + "seen in brigade%s: bytes: %" APR_SIZE_T_FMT + ", non-file bytes: %" APR_SIZE_T_FMT ", eor " + "buckets: %d, morphing buckets: %d", + flush_upto == NULL ? " so far" + : " since last flush point", + bytes_in_brigade, + non_file_bytes_in_brigade, + eor_buckets_in_brigade, + morphing_bucket_in_brigade); + } + /* + * Defer the actual blocking write to avoid doing many writes. + */ + *flush_upto = next; + + bytes_in_brigade = 0; + non_file_bytes_in_brigade = 0; + eor_buckets_in_brigade = 0; + morphing_bucket_in_brigade = 0; + } + } + + if (loglevel >= APLOG_TRACE8) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, f->c, + "brigade contains: bytes: %" APR_SIZE_T_FMT + ", non-file bytes: %" APR_SIZE_T_FMT + ", eor buckets: %d, morphing buckets: %d", + bytes_in_brigade, non_file_bytes_in_brigade, + eor_buckets_in_brigade, morphing_bucket_in_brigade); + } + + return (bytes_in_brigade >= THRESHOLD_MIN_WRITE); +} + +AP_DECLARE(int) ap_filter_should_yield(ap_filter_t *f) +{ + while (f) { + if ((f->bb) && !APR_BRIGADE_EMPTY(f->bb)) { + return 1; + } + f = f->next; + } + return 0; +} + AP_DECLARE_NONSTD(apr_status_t) ap_filter_flush(apr_bucket_brigade *bb, void *ctx) { --Apple-Mail=_6A4BE87B-F14F-4446-8C54-1371910991FA--