Return-Path: X-Original-To: apmail-httpd-dev-archive@www.apache.org Delivered-To: apmail-httpd-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C817C119D4 for ; Mon, 15 Sep 2014 12:30:04 +0000 (UTC) Received: (qmail 64138 invoked by uid 500); 15 Sep 2014 12:30:04 -0000 Delivered-To: apmail-httpd-dev-archive@httpd.apache.org Received: (qmail 64063 invoked by uid 500); 15 Sep 2014 12:30:04 -0000 Mailing-List: contact dev-help@httpd.apache.org; run by ezmlm Precedence: bulk Reply-To: dev@httpd.apache.org list-help: list-unsubscribe: List-Post: List-Id: Delivered-To: mailing list dev@httpd.apache.org Received: (qmail 64053 invoked by uid 99); 15 Sep 2014 12:30:04 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 Sep 2014 12:30:04 +0000 X-ASF-Spam-Status: No, hits=-0.0 required=5.0 tests=SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of minfrin@sharp.fm designates 80.168.143.5 as permitted sender) Received: from [80.168.143.5] (HELO monica.sharp.fm) (80.168.143.5) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 Sep 2014 12:29:36 +0000 Received: from [10.68.0.113] (smb-rcdg2-01.wifihubtelecom.net [213.174.102.202]) (using TLSv1 with cipher AES128-SHA (128/128 bits)) (Client did not present a certificate) (Authenticated sender: minfrin@sharp.fm) by monica.sharp.fm (Postfix) with ESMTPSA id 26576817AC for ; Mon, 15 Sep 2014 13:29:35 +0100 (BST) From: Graham Leggett Content-Type: multipart/mixed; boundary="Apple-Mail=_94E5CD65-6FA8-4766-A245-812FB3839F34" Message-Id: <04776F50-0A53-4C1D-AA1C-9F57B60BC0AC@sharp.fm> Mime-Version: 1.0 (Mac OS X Mail 7.3 \(1878.6\)) Subject: Re: [Patch] Async write completion for the full connection filter stack Date: Mon, 15 Sep 2014 14:29:35 +0200 References: <715A3AE8-8015-4537-943F-A34E0E2A76F7@sharp.fm> <540DFB30.6050700@apache.org> <2F3B060C-E383-407F-A24C-7F026AD3CA81@sharp.fm> <7A74676C-AA9D-40E1-A434-9280C297217D@sharp.fm> <54128B11.9080803@apache.org> <496E8EFA-7A10-4D91-8C7E-434A9A3B09C5@sharp.fm> <541499A9.5050408@apache.org> <4D9E72F8-4918-49AD-8DC4-E3594C9ED43D@sharp.fm> To: dev@httpd.apache.org In-Reply-To: <4D9E72F8-4918-49AD-8DC4-E3594C9ED43D@sharp.fm> X-Mailer: Apple Mail (2.1878.6) X-Virus-Checked: Checked by ClamAV on apache.org --Apple-Mail=_94E5CD65-6FA8-4766-A245-812FB3839F34 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=windows-1252 On 15 Sep 2014, at 2:13 PM, Graham Leggett wrote: > There is an issue I hadn=92t considered - the effect of calling = ap_remove_output_filter(). >=20 > If we remove the filter with buckets having been set aside, the = c->data_in_output_filters will remain unchanged and off-by-one, leading = to a spin (it will never count back down to zero). This is a nasty bug = to have to chase down. >=20 > In the attached patch I have investigated adding both buffered_bb and = the deferred_write_pool to ap_filter_t, and having = ap_remove_output_filter() =93do the right thing=94 when called. Both of = these can be optionally set by the caller, but are otherwise not handled = by the caller, the filter API worries about cleaning up, and worries = about ensuring that c->data_in_output_filters is always accurate. >=20 > I have also introduced an optimisation where if this is a request = filter, we use r->pool to setaside buckets, while if we=92re a = connection filter we use the deferred write pool. >=20 > Looks like a similar thing needs to be done with input filters. Another refinement - deferred_write_pool is now just deferred_pool, and = we only manipulate the data_in_output_filters when removing an output = filter. Regards, Graham =97 --Apple-Mail=_94E5CD65-6FA8-4766-A245-812FB3839F34 Content-Disposition: attachment; filename=httpd-async-fullstack-ssl5.patch Content-Type: application/octet-stream; x-unix-mode=0644; name="httpd-async-fullstack-ssl5.patch" Content-Transfer-Encoding: 7bit Index: include/httpd.h =================================================================== --- include/httpd.h (revision 1622873) +++ include/httpd.h (working copy) @@ -1181,6 +1181,12 @@ /** Context under which this connection was suspended */ void *suspended_baton; + + /** Activity marker for this connection */ + unsigned int activity; + + /** Empty bucket brigade */ + apr_bucket_brigade *empty; }; struct conn_slave_rec { Index: include/util_filter.h =================================================================== --- include/util_filter.h (revision 1622873) +++ include/util_filter.h (working copy) @@ -278,6 +278,12 @@ * to the request_rec, except that it is used for connection filters. */ conn_rec *c; + + /** Buffered data associated with the current filter. */ + apr_bucket_brigade *buffered_bb; + + /** Dedicated pool to use for deferred writes. */ + apr_pool_t *deferred_pool; }; /** @@ -519,8 +525,11 @@ */ /** - * prepare a bucket brigade to be setaside. If a different brigade was + * Prepare a bucket brigade to be setaside. If a different brigade was * set-aside earlier, then the two brigades are concatenated together. + * + * If *save_to is NULL, the brigade will be created, and a cleanup registered + * to clear the brigade address when the pool is destroyed. * @param f The current filter * @param save_to The brigade that was previously set-aside. Regardless, the * new bucket brigade is returned in this location. @@ -533,6 +542,51 @@ apr_bucket_brigade **b, apr_pool_t *p); /** + * Prepare a bucket brigade to be setaside, creating a dedicated pool within + * the filter to handle the lifetime of the setaside brigade. If a non-empty + * amount of data was set aside, the counter indicating whether data exists + * in the output chain is incremented. + * @param f The current filter + * @param bb The bucket brigade to set aside. This brigade is always empty + * on return + */ +AP_DECLARE(apr_status_t) ap_filter_setaside_brigade(ap_filter_t *f, + apr_bucket_brigade *bb); + +/** + * Reinstate a brigade setaside earlier, and calculate the amount of data we + * should write based on the presence of flush buckets, size limits on in + * memory buckets, and the number of outstanding requests in the pipeline. If + * data was previously set aside, the counter indicating whether data exists + * in the output chain is decremented. + * + * If the brigade passed in is empty, we reinstate the brigade and return + * immediately on the assumption that any buckets needing to be flushed were + * flushed before being passed to ap_filter_setaside_brigade(). + * + * @param f The current filter + * @param bb The bucket brigade to restore to. + * @param flush_upto Work out the bucket we need to flush up to, based on the + * presence of a flush bucket, size limits on in-memory + * buckets, size limits on the number of requests outstanding + * in the pipeline. + * @return If positive, there are at least THRESHOLD_MIN_WRITE bytes to write + * past the bucket marked by flush_upto, or if negative, the original + * brigade to write was empty. If zero, there is less than + * THRESHOLD_MIN_WRITE bytes to write, and the caller might choose to + * wait for more data to arrive before writing. + */ +AP_DECLARE(int) ap_filter_reinstate_brigade(ap_filter_t *f, + apr_bucket_brigade *bb, + apr_bucket **flush_upto); + +/** + * Return non zero if the current filter should yield to allow write completion + * to take place. + */ +AP_DECLARE(int) ap_filter_should_yield(ap_filter_t *f); + +/** * Flush function for apr_brigade_* calls. This calls ap_pass_brigade * to flush the brigade if the brigade buffer overflows. * @param bb The brigade to flush Index: server/core.c =================================================================== --- server/core.c (revision 1622873) +++ server/core.c (working copy) @@ -4888,6 +4888,7 @@ c->id = id; c->bucket_alloc = alloc; + c->empty = apr_brigade_create(c->pool, c->bucket_alloc); c->clogging_input_filters = 0; Index: server/core_filters.c =================================================================== --- server/core_filters.c (revision 1622873) +++ server/core_filters.c (working copy) @@ -78,9 +78,7 @@ #define APLOG_MODULE_INDEX AP_CORE_MODULE_INDEX struct core_output_filter_ctx { - apr_bucket_brigade *buffered_bb; apr_bucket_brigade *tmp_flush_bb; - apr_pool_t *deferred_write_pool; apr_size_t bytes_written; }; @@ -328,11 +326,6 @@ return APR_SUCCESS; } -static void setaside_remaining_output(ap_filter_t *f, - core_output_filter_ctx_t *ctx, - apr_bucket_brigade *bb, - conn_rec *c); - static apr_status_t send_brigade_nonblocking(apr_socket_t *s, apr_bucket_brigade *bb, apr_size_t *bytes_written, @@ -358,33 +351,24 @@ conn_rec *c); #endif -/* XXX: Should these be configurable parameters? */ -#define THRESHOLD_MIN_WRITE 4096 -#define THRESHOLD_MAX_BUFFER 65536 -#define MAX_REQUESTS_IN_PIPELINE 5 - /* Optional function coming from mod_logio, used for logging of output * traffic */ extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *ap__logio_add_bytes_out; -apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb) +apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *bb) { conn_rec *c = f->c; core_net_rec *net = f->ctx; core_output_filter_ctx_t *ctx = net->out_ctx; - apr_bucket_brigade *bb = NULL; - apr_bucket *bucket, *next, *flush_upto = NULL; - apr_size_t bytes_in_brigade, non_file_bytes_in_brigade; - int eor_buckets_in_brigade, morphing_bucket_in_brigade; + apr_bucket *flush_upto = NULL; + int should_write; apr_status_t rv; int loglevel = ap_get_conn_module_loglevel(c, APLOG_MODULE_INDEX); /* Fail quickly if the connection has already been aborted. */ if (c->aborted) { - if (new_bb != NULL) { - apr_brigade_cleanup(new_bb); - } + apr_brigade_cleanup(bb); return APR_ECONNABORTED; } @@ -398,32 +382,15 @@ */ ctx->tmp_flush_bb = apr_brigade_create(c->pool, c->bucket_alloc); /* same for buffered_bb and ap_save_brigade */ - ctx->buffered_bb = apr_brigade_create(c->pool, c->bucket_alloc); + f->buffered_bb = apr_brigade_create(c->pool, c->bucket_alloc); } - if (new_bb != NULL) - bb = new_bb; - - if ((ctx->buffered_bb != NULL) && - !APR_BRIGADE_EMPTY(ctx->buffered_bb)) { - if (new_bb != NULL) { - APR_BRIGADE_PREPEND(bb, ctx->buffered_bb); - } - else { - bb = ctx->buffered_bb; - } - c->data_in_output_filters = 0; - } - else if (new_bb == NULL) { - return APR_SUCCESS; - } - /* Scan through the brigade and decide whether to attempt a write, * and how much to write, based on the following rules: * - * 1) The new_bb is null: Do a nonblocking write of as much as + * 1) The bb is empty: Do a nonblocking write of as much as * possible: do a nonblocking write of as much data as possible, - * then save the rest in ctx->buffered_bb. (If new_bb == NULL, + * then save the rest in ctx->buffered_bb. (If bb is empty, * it probably means that the MPM is doing asynchronous write * completion and has just determined that this connection * is writable.) @@ -465,86 +432,10 @@ * then save the rest in ctx->buffered_bb. */ - if (new_bb == NULL) { - rv = send_brigade_nonblocking(net->client_socket, bb, - &(ctx->bytes_written), c); - if (APR_STATUS_IS_EAGAIN(rv)) { - rv = APR_SUCCESS; - } - else if (rv != APR_SUCCESS) { - /* The client has aborted the connection */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c, - "core_output_filter: writing data to the network"); - c->aborted = 1; - } - setaside_remaining_output(f, ctx, bb, c); - return rv; - } + should_write = ap_filter_reinstate_brigade(f, bb, &flush_upto); - bytes_in_brigade = 0; - non_file_bytes_in_brigade = 0; - eor_buckets_in_brigade = 0; - morphing_bucket_in_brigade = 0; - - for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb); - bucket = next) { - next = APR_BUCKET_NEXT(bucket); - - if (!APR_BUCKET_IS_METADATA(bucket)) { - if (bucket->length == (apr_size_t)-1) { - /* - * A setaside of morphing buckets would read everything into - * memory. Instead, we will flush everything up to and - * including this bucket. - */ - morphing_bucket_in_brigade = 1; - } - else { - bytes_in_brigade += bucket->length; - if (!APR_BUCKET_IS_FILE(bucket)) - non_file_bytes_in_brigade += bucket->length; - } - } - else if (AP_BUCKET_IS_EOR(bucket)) { - eor_buckets_in_brigade++; - } - - if (APR_BUCKET_IS_FLUSH(bucket) - || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER - || morphing_bucket_in_brigade - || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) { - /* this segment of the brigade MUST be sent before returning. */ - - if (loglevel >= APLOG_TRACE6) { - char *reason = APR_BUCKET_IS_FLUSH(bucket) ? - "FLUSH bucket" : - (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ? - "THRESHOLD_MAX_BUFFER" : - morphing_bucket_in_brigade ? "morphing bucket" : - "MAX_REQUESTS_IN_PIPELINE"; - ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, c, - "will flush because of %s", reason); - ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c, - "seen in brigade%s: bytes: %" APR_SIZE_T_FMT - ", non-file bytes: %" APR_SIZE_T_FMT ", eor " - "buckets: %d, morphing buckets: %d", - flush_upto == NULL ? " so far" - : " since last flush point", - bytes_in_brigade, - non_file_bytes_in_brigade, - eor_buckets_in_brigade, - morphing_bucket_in_brigade); - } - /* - * Defer the actual blocking write to avoid doing many writes. - */ - flush_upto = next; - - bytes_in_brigade = 0; - non_file_bytes_in_brigade = 0; - eor_buckets_in_brigade = 0; - morphing_bucket_in_brigade = 0; - } + if (APR_BRIGADE_EMPTY(bb)) { + return APR_SUCCESS; } if (flush_upto != NULL) { @@ -571,16 +462,7 @@ APR_BRIGADE_CONCAT(bb, ctx->tmp_flush_bb); } - if (loglevel >= APLOG_TRACE8) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c, - "brigade contains: bytes: %" APR_SIZE_T_FMT - ", non-file bytes: %" APR_SIZE_T_FMT - ", eor buckets: %d, morphing buckets: %d", - bytes_in_brigade, non_file_bytes_in_brigade, - eor_buckets_in_brigade, morphing_bucket_in_brigade); - } - - if (bytes_in_brigade >= THRESHOLD_MIN_WRITE) { + if (should_write) { rv = send_brigade_nonblocking(net->client_socket, bb, &(ctx->bytes_written), c); if ((rv != APR_SUCCESS) && (!APR_STATUS_IS_EAGAIN(rv))) { @@ -598,43 +480,12 @@ } } - setaside_remaining_output(f, ctx, bb, c); + remove_empty_buckets(bb); + ap_filter_setaside_brigade(f, bb); + return APR_SUCCESS; } -/* - * This function assumes that either ctx->buffered_bb == NULL, or - * ctx->buffered_bb is empty, or ctx->buffered_bb == bb - */ -static void setaside_remaining_output(ap_filter_t *f, - core_output_filter_ctx_t *ctx, - apr_bucket_brigade *bb, - conn_rec *c) -{ - if (bb == NULL) { - return; - } - remove_empty_buckets(bb); - if (!APR_BRIGADE_EMPTY(bb)) { - c->data_in_output_filters = 1; - if (bb != ctx->buffered_bb) { - if (!ctx->deferred_write_pool) { - apr_pool_create(&ctx->deferred_write_pool, c->pool); - apr_pool_tag(ctx->deferred_write_pool, "deferred_write"); - } - ap_save_brigade(f, &(ctx->buffered_bb), &bb, - ctx->deferred_write_pool); - } - } - else if (ctx->deferred_write_pool) { - /* - * There are no more requests in the pipeline. We can just clear the - * pool. - */ - apr_pool_clear(ctx->deferred_write_pool); - } -} - #ifndef APR_MAX_IOVEC_SIZE #define MAX_IOVEC_TO_WRITE 16 #else Index: server/mpm/event/event.c =================================================================== --- server/mpm/event/event.c (revision 1622873) +++ server/mpm/event/event.c (working copy) @@ -1090,10 +1090,10 @@ ap_filter_t *output_filter = c->output_filters; apr_status_t rv; ap_update_child_status_from_conn(sbh, SERVER_BUSY_WRITE, c); - while (output_filter->next != NULL) { - output_filter = output_filter->next; - } - rv = output_filter->frec->filter_func.out_func(output_filter, NULL); + + rv = ap_pass_brigade(output_filter, c->empty); + apr_brigade_cleanup(c->empty); + if (rv != APR_SUCCESS) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(00470) "network write failure in core output filter"); Index: server/util_filter.c =================================================================== --- server/util_filter.c (revision 1622873) +++ server/util_filter.c (working copy) @@ -24,6 +24,7 @@ #include "http_config.h" #include "http_core.h" #include "http_log.h" +#include "http_request.h" #include "util_filter.h" /* NOTE: Apache's current design doesn't allow a pool to be passed thru, @@ -32,6 +33,11 @@ #define FILTER_POOL apr_hook_global_pool #include "ap_hooks.h" /* for apr_hook_global_pool */ +/* XXX: Should these be configurable parameters? */ +#define THRESHOLD_MIN_WRITE 4096 +#define THRESHOLD_MAX_BUFFER 65536 +#define MAX_REQUESTS_IN_PIPELINE 5 + /* ** This macro returns true/false if a given filter should be inserted BEFORE ** another filter. This will happen when one of: 1) there isn't another @@ -319,6 +325,8 @@ f->r = frec->ftype < AP_FTYPE_CONNECTION ? r : NULL; f->c = c; f->next = NULL; + f->buffered_bb = NULL; + f->deferred_pool = NULL; if (INSERT_BEFORE(f, *outf)) { f->next = *outf; @@ -474,6 +482,16 @@ AP_DECLARE(void) ap_remove_output_filter(ap_filter_t *f) { + + if ((f->buffered_bb != NULL) && !APR_BRIGADE_EMPTY(f->buffered_bb)) { + apr_brigade_cleanup(f->buffered_bb); + f->c->data_in_output_filters--; + } + + if (f->deferred_pool) { + apr_pool_destroy(f->deferred_pool); + } + remove_any_filter(f, f->r ? &f->r->output_filters : NULL, f->r ? &f->r->proto_output_filters : NULL, &f->c->output_filters); @@ -566,6 +584,15 @@ { if (next) { apr_bucket *e; + unsigned int activity; + apr_status_t status; + + /* + * Mark that we have passed this way. + */ + next->c->activity++; + activity = next->c->activity; + if ((e = APR_BRIGADE_LAST(bb)) && APR_BUCKET_IS_EOS(e) && next->r) { /* This is only safe because HTTP_HEADER filter is always in * the filter stack. This ensures that there is ALWAYS a @@ -587,7 +614,24 @@ } } } - return next->frec->filter_func.out_func(next, bb); + status = next->frec->filter_func.out_func(next, bb); + + /* No problems found, and did the brigade not get passed on by a + * filter to the next filter in the chain? Compensate by passing + * the empty brigade to the next filter, so every filter gets a + * turn to write. + */ + while (next->c->data_in_output_filters && APR_SUCCESS == status) { + next = next->next; + if (next && next->c->activity == activity) { + status = next->frec->filter_func.out_func(next, next->c->empty); + } + else { + break; + } + } + + return status; } return AP_NOBODY_WROTE; } @@ -635,7 +679,8 @@ apr_status_t rv, srv = APR_SUCCESS; /* If have never stored any data in the filter, then we had better - * create an empty bucket brigade so that we can concat. + * create an empty bucket brigade so that we can concat. Register + * a cleanup to zero out the pointer if the pool is cleared. */ if (!(*saveto)) { *saveto = apr_brigade_create(p, f->c->bucket_alloc); @@ -673,6 +718,165 @@ return srv; } +AP_DECLARE(apr_status_t) ap_filter_setaside_brigade(ap_filter_t *f, + apr_bucket_brigade *bb) +{ + if (!APR_BRIGADE_EMPTY(bb)) { + if (!f->buffered_bb || APR_BRIGADE_EMPTY((f->buffered_bb))) { + f->c->data_in_output_filters++; + } + if (bb != f->buffered_bb) { + if (!(f->r) && !(f->deferred_pool)) { + apr_pool_create(&f->deferred_pool, f->c->pool); + apr_pool_tag(f->deferred_pool, "deferred_write"); + } + return ap_save_brigade(f, &f->buffered_bb, &bb, + f->r ? f->r->pool : f->deferred_pool); + } + } + else if (f->deferred_pool) { + /* + * There are no more requests in the pipeline. We can just clear the + * pool. + */ + apr_pool_clear(f->deferred_pool); + } + return APR_SUCCESS; +} + +AP_DECLARE(int) ap_filter_reinstate_brigade(ap_filter_t *f, + apr_bucket_brigade *bb, + apr_bucket **flush_upto) +{ + apr_bucket *bucket, *next; + apr_size_t bytes_in_brigade, non_file_bytes_in_brigade; + int eor_buckets_in_brigade, morphing_bucket_in_brigade; + int loglevel = ap_get_conn_module_loglevel(f->c, APLOG_MODULE_INDEX); + int empty = APR_BRIGADE_EMPTY(bb); + + if ((f->buffered_bb != NULL) && !APR_BRIGADE_EMPTY(f->buffered_bb)) { + APR_BRIGADE_PREPEND(bb, f->buffered_bb); + f->c->data_in_output_filters--; + } + + /* + * Determine if and up to which bucket we need to do a blocking write: + * + * a) The brigade contains a flush bucket: Do a blocking write + * of everything up that point. + * + * b) The request is in CONN_STATE_HANDLER state, and the brigade + * contains at least THRESHOLD_MAX_BUFFER bytes in non-file + * buckets: Do blocking writes until the amount of data in the + * buffer is less than THRESHOLD_MAX_BUFFER. (The point of this + * rule is to provide flow control, in case a handler is + * streaming out lots of data faster than the data can be + * sent to the client.) + * + * c) The request is in CONN_STATE_HANDLER state, and the brigade + * contains at least MAX_REQUESTS_IN_PIPELINE EOR buckets: + * Do blocking writes until less than MAX_REQUESTS_IN_PIPELINE EOR + * buckets are left. (The point of this rule is to prevent too many + * FDs being kept open by pipelined requests, possibly allowing a + * DoS). + * + * d) The brigade contains a morphing bucket: If there was no other + * reason to do a blocking write yet, try reading the bucket. If its + * contents fit into memory before THRESHOLD_MAX_BUFFER is reached, + * everything is fine. Otherwise we need to do a blocking write the + * up to and including the morphing bucket, because ap_save_brigade() + * would read the whole bucket into memory later on. + */ + + *flush_upto = NULL; + + if (empty) { + return -1; + } + + bytes_in_brigade = 0; + non_file_bytes_in_brigade = 0; + eor_buckets_in_brigade = 0; + morphing_bucket_in_brigade = 0; + + for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb); + bucket = next) { + next = APR_BUCKET_NEXT(bucket); + + if (!APR_BUCKET_IS_METADATA(bucket)) { + if (bucket->length == (apr_size_t)-1) { + /* + * A setaside of morphing buckets would read everything into + * memory. Instead, we will flush everything up to and + * including this bucket. + */ + morphing_bucket_in_brigade = 1; + } + else { + bytes_in_brigade += bucket->length; + if (!APR_BUCKET_IS_FILE(bucket)) + non_file_bytes_in_brigade += bucket->length; + } + } + else if (AP_BUCKET_IS_EOR(bucket)) { + eor_buckets_in_brigade++; + } + + if (APR_BUCKET_IS_FLUSH(bucket) + || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER + || morphing_bucket_in_brigade + || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) { + /* this segment of the brigade MUST be sent before returning. */ + + if (loglevel >= APLOG_TRACE6) { + char *reason = APR_BUCKET_IS_FLUSH(bucket) ? + "FLUSH bucket" : + (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ? + "THRESHOLD_MAX_BUFFER" : + morphing_bucket_in_brigade ? "morphing bucket" : + "MAX_REQUESTS_IN_PIPELINE"; + ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, f->c, + "will flush because of %s", reason); + ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, f->c, + "seen in brigade%s: bytes: %" APR_SIZE_T_FMT + ", non-file bytes: %" APR_SIZE_T_FMT ", eor " + "buckets: %d, morphing buckets: %d", + flush_upto == NULL ? " so far" + : " since last flush point", + bytes_in_brigade, + non_file_bytes_in_brigade, + eor_buckets_in_brigade, + morphing_bucket_in_brigade); + } + /* + * Defer the actual blocking write to avoid doing many writes. + */ + *flush_upto = next; + + bytes_in_brigade = 0; + non_file_bytes_in_brigade = 0; + eor_buckets_in_brigade = 0; + morphing_bucket_in_brigade = 0; + } + } + + if (loglevel >= APLOG_TRACE8) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, f->c, + "brigade contains: bytes: %" APR_SIZE_T_FMT + ", non-file bytes: %" APR_SIZE_T_FMT + ", eor buckets: %d, morphing buckets: %d", + bytes_in_brigade, non_file_bytes_in_brigade, + eor_buckets_in_brigade, morphing_bucket_in_brigade); + } + + return (bytes_in_brigade >= THRESHOLD_MIN_WRITE); +} + +AP_DECLARE(int) ap_filter_should_yield(ap_filter_t *f) +{ + return f->c->data_in_output_filters; +} + AP_DECLARE_NONSTD(apr_status_t) ap_filter_flush(apr_bucket_brigade *bb, void *ctx) { Index: modules/ssl/ssl_engine_io.c =================================================================== --- modules/ssl/ssl_engine_io.c (revision 1622873) +++ modules/ssl/ssl_engine_io.c (working copy) @@ -1663,20 +1663,24 @@ bio_filter_in_ctx_t *inctx; bio_filter_out_ctx_t *outctx; apr_read_type_e rblock = APR_NONBLOCK_READ; + apr_bucket *flush_upto = NULL; if (f->c->aborted) { apr_brigade_cleanup(bb); return APR_ECONNABORTED; } + inctx = (bio_filter_in_ctx_t *)filter_ctx->pbioRead->ptr; + outctx = (bio_filter_out_ctx_t *)filter_ctx->pbioWrite->ptr; + + /* Reinstate any buffered content */ + ap_filter_reinstate_brigade(f, bb, &flush_upto); + if (!filter_ctx->pssl) { /* ssl_filter_io_shutdown was called */ return ap_pass_brigade(f->next, bb); } - inctx = (bio_filter_in_ctx_t *)filter_ctx->pbioRead->ptr; - outctx = (bio_filter_out_ctx_t *)filter_ctx->pbioWrite->ptr; - /* When we are the writer, we must initialize the inctx * mode so that we block for any required ssl input, because * output filtering is always nonblocking. @@ -1691,6 +1695,16 @@ while (!APR_BRIGADE_EMPTY(bb)) { apr_bucket *bucket = APR_BRIGADE_FIRST(bb); + /* if the core has set aside data, back off and try later */ + if (!flush_upto) { + if (ap_filter_should_yield(f)) { + break; + } + } + else if (flush_upto == bucket) { + flush_upto = NULL; + } + /* If it is a flush or EOS, we need to pass this down. * These types do not require translation by OpenSSL. */ @@ -1758,8 +1772,13 @@ break; } } + } + if (APR_STATUS_IS_EOF(status) || (status == APR_SUCCESS)) { + return ap_filter_setaside_brigade(f, bb); + } + return status; } @@ -2008,6 +2027,9 @@ filter_ctx->pOutputFilter = ap_add_output_filter(ssl_io_filter, filter_ctx, r, c); + filter_ctx->pOutputFilter->buffered_bb = apr_brigade_create(c->pool, + c->bucket_alloc); + filter_ctx->pbioWrite = BIO_new(&bio_filter_out_method); filter_ctx->pbioWrite->ptr = (void *)bio_filter_out_ctx_new(filter_ctx, c); --Apple-Mail=_94E5CD65-6FA8-4766-A245-812FB3839F34--