Return-Path: X-Original-To: apmail-httpd-cvs-archive@www.apache.org Delivered-To: apmail-httpd-cvs-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BE61A188D6 for ; Wed, 2 Mar 2016 11:21:50 +0000 (UTC) Received: (qmail 5258 invoked by uid 500); 2 Mar 2016 11:21:50 -0000 Delivered-To: apmail-httpd-cvs-archive@httpd.apache.org Received: (qmail 5190 invoked by uid 500); 2 Mar 2016 11:21:50 -0000 Mailing-List: contact cvs-help@httpd.apache.org; run by ezmlm Precedence: bulk Reply-To: dev@httpd.apache.org list-help: list-unsubscribe: List-Post: List-Id: Delivered-To: mailing list cvs@httpd.apache.org Received: (qmail 5181 invoked by uid 99); 2 Mar 2016 11:21:50 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Mar 2016 11:21:50 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 053E61A11DB for ; Wed, 2 Mar 2016 11:21:50 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.471 X-Spam-Level: * X-Spam-Status: No, score=1.471 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.329] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id QBYWm1WtfvAk for ; Wed, 2 Mar 2016 11:21:37 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with ESMTP id 0D42060D34 for ; Wed, 2 Mar 2016 11:21:36 +0000 (UTC) Received: from svn01-us-west.apache.org (svn.apache.org [10.41.0.6]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 59A3EE08E0 for ; Wed, 2 Mar 2016 11:21:31 +0000 (UTC) Received: from svn01-us-west.apache.org (localhost [127.0.0.1]) by svn01-us-west.apache.org (ASF Mail Server at svn01-us-west.apache.org) with ESMTP id 86CF23A04EF for ; Wed, 2 Mar 2016 11:21:30 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1733259 [2/4] - in /httpd/httpd/branches/2.4.x: ./ modules/http2/ Date: Wed, 02 Mar 2016 11:21:29 -0000 To: cvs@httpd.apache.org From: icing@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20160302112130.86CF23A04EF@svn01-us-west.apache.org> Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_io.c URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_io.c?rev=1733259&r1=1733258&r2=1733259&view=diff ============================================================================== --- httpd/httpd/branches/2.4.x/modules/http2/h2_io.c (original) +++ httpd/httpd/branches/2.4.x/modules/http2/h2_io.c Wed Mar 2 11:21:28 2016 @@ -33,23 +33,45 @@ #include "h2_task.h" #include "h2_util.h" -h2_io *h2_io_create(int id, apr_pool_t *pool) +h2_io *h2_io_create(int id, apr_pool_t *pool, const h2_request *request) { h2_io *io = apr_pcalloc(pool, sizeof(*io)); if (io) { io->id = id; io->pool = pool; io->bucket_alloc = apr_bucket_alloc_create(pool); + io->request = h2_request_clone(pool, request); } return io; } -void h2_io_destroy(h2_io *io) +void h2_io_redo(h2_io *io) { - if (io->pool) { - apr_pool_destroy(io->pool); - /* gone */ + io->worker_started = 0; + io->response = NULL; + io->rst_error = 0; + if (io->bbin) { + apr_brigade_cleanup(io->bbin); + } + if (io->bbout) { + apr_brigade_cleanup(io->bbout); + } + if (io->tmp) { + apr_brigade_cleanup(io->tmp); + } + io->started_at = io->done_at = 0; +} + +int h2_io_is_repeatable(h2_io *io) { + if (io->submitted + || io->input_consumed > 0 + || !io->request) { + /* cannot repeat that. */ + return 0; } + return (!strcmp("GET", io->request->method) + || !strcmp("HEAD", io->request->method) + || !strcmp("OPTIONS", io->request->method)); } void h2_io_set_response(h2_io *io, h2_response *response) @@ -75,6 +97,11 @@ int h2_io_in_has_eos_for(h2_io *io) return io->eos_in || (io->bbin && h2_util_has_eos(io->bbin, -1)); } +int h2_io_in_has_data(h2_io *io) +{ + return io->bbin && h2_util_bb_has_data_or_eos(io->bbin); +} + int h2_io_out_has_data(h2_io *io) { return io->bbout && h2_util_bb_has_data_or_eos(io->bbout); @@ -102,12 +129,13 @@ apr_status_t h2_io_in_shutdown(h2_io *io } -void h2_io_signal_init(h2_io *io, h2_io_op op, int timeout_secs, apr_thread_cond_t *cond) +void h2_io_signal_init(h2_io *io, h2_io_op op, apr_interval_time_t timeout, + apr_thread_cond_t *cond) { io->timed_op = op; io->timed_cond = cond; - if (timeout_secs > 0) { - io->timeout_at = apr_time_now() + apr_time_from_sec(timeout_secs); + if (timeout > 0) { + io->timeout_at = apr_time_now() + timeout; } else { io->timeout_at = 0; @@ -255,6 +283,18 @@ apr_status_t h2_io_in_read(h2_io *io, ap } } + if (status == APR_SUCCESS && (!io->bbin || APR_BRIGADE_EMPTY(io->bbin))) { + if (io->eos_in) { + if (!io->eos_in_written) { + status = append_eos(io, bb, trailers); + io->eos_in_written = 1; + } + } + } + + if (status == APR_SUCCESS && APR_BRIGADE_EMPTY(bb)) { + return APR_EAGAIN; + } return status; } @@ -298,7 +338,7 @@ apr_status_t h2_io_out_readx(h2_io *io, return APR_ECONNABORTED; } - if (io->eos_out) { + if (io->eos_out_read) { *plen = 0; *peos = 1; return APR_SUCCESS; @@ -316,7 +356,7 @@ apr_status_t h2_io_out_readx(h2_io *io, else { status = h2_util_bb_readx(io->bbout, cb, ctx, plen, peos); if (status == APR_SUCCESS) { - io->eos_out = *peos; + io->eos_out_read = *peos; } } @@ -330,7 +370,7 @@ apr_status_t h2_io_out_read_to(h2_io *io return APR_ECONNABORTED; } - if (io->eos_out) { + if (io->eos_out_read) { *plen = 0; *peos = 1; return APR_SUCCESS; @@ -341,7 +381,7 @@ apr_status_t h2_io_out_read_to(h2_io *io return APR_EAGAIN; } - io->eos_out = *peos = h2_util_has_eos(io->bbout, *plen); + io->eos_out_read = *peos = h2_util_has_eos(io->bbout, *plen); return h2_util_move(bb, io->bbout, *plen, NULL, "h2_io_read_to"); } @@ -413,14 +453,17 @@ apr_status_t h2_io_out_close(h2_io *io, if (io->rst_error) { return APR_ECONNABORTED; } - if (!io->eos_out) { /* EOS has not been read yet */ + if (!io->eos_out_read) { /* EOS has not been read yet */ process_trailers(io, trailers); if (!io->bbout) { io->bbout = apr_brigade_create(io->pool, io->bucket_alloc); } - if (!h2_util_has_eos(io->bbout, -1)) { - APR_BRIGADE_INSERT_TAIL(io->bbout, - apr_bucket_eos_create(io->bucket_alloc)); + if (!io->eos_out) { + io->eos_out = 1; + if (!h2_util_has_eos(io->bbout, -1)) { + APR_BRIGADE_INSERT_TAIL(io->bbout, + apr_bucket_eos_create(io->bucket_alloc)); + } } } return APR_SUCCESS; Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_io.h URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_io.h?rev=1733259&r1=1733258&r2=1733259&view=diff ============================================================================== --- httpd/httpd/branches/2.4.x/modules/http2/h2_io.h (original) +++ httpd/httpd/branches/2.4.x/modules/http2/h2_io.h Wed Mar 2 11:21:28 2016 @@ -20,6 +20,7 @@ struct h2_response; struct apr_thread_cond_t; struct h2_mplx; struct h2_request; +struct h2_task; typedef apr_status_t h2_io_data_cb(void *ctx, const char *data, apr_off_t len); @@ -30,8 +31,7 @@ typedef enum { H2_IO_READ, H2_IO_WRITE, H2_IO_ANY, -} -h2_io_op; +} h2_io_op; typedef struct h2_io h2_io; @@ -51,15 +51,19 @@ struct h2_io { unsigned int orphaned : 1; /* h2_stream is gone for this io */ unsigned int worker_started : 1; /* h2_worker started processing for this io */ unsigned int worker_done : 1; /* h2_worker finished for this io */ + unsigned int submitted : 1; /* response has been submitted to client */ unsigned int request_body : 1; /* iff request has body */ unsigned int eos_in : 1; /* input eos has been seen */ unsigned int eos_in_written : 1; /* input eos has been forwarded */ - unsigned int eos_out : 1; /* output eos has been seen */ + unsigned int eos_out : 1; /* output eos is present */ + unsigned int eos_out_read : 1; /* output eos has been forwarded */ h2_io_op timed_op; /* which operation is waited on, if any */ struct apr_thread_cond_t *timed_cond; /* condition to wait on, maybe NULL */ apr_time_t timeout_at; /* when IO wait will time out */ + apr_time_t started_at; /* when processing started */ + apr_time_t done_at; /* when processing was done */ apr_size_t input_consumed; /* how many bytes have been read */ int files_handles_owned; @@ -72,12 +76,7 @@ struct h2_io { /** * Creates a new h2_io for the given stream id. */ -h2_io *h2_io_create(int id, apr_pool_t *pool); - -/** - * Frees any resources hold by the h2_io instance. - */ -void h2_io_destroy(h2_io *io); +h2_io *h2_io_create(int id, apr_pool_t *pool, const struct h2_request *request); /** * Set the response of this stream. @@ -89,6 +88,9 @@ void h2_io_set_response(h2_io *io, struc */ void h2_io_rst(h2_io *io, int error); +int h2_io_is_repeatable(h2_io *io); +void h2_io_redo(h2_io *io); + /** * The input data is completely queued. Blocked reads will return immediately * and give either data or EOF. @@ -98,9 +100,13 @@ int h2_io_in_has_eos_for(h2_io *io); * Output data is available. */ int h2_io_out_has_data(h2_io *io); +/** + * Input data is available. + */ +int h2_io_in_has_data(h2_io *io); void h2_io_signal(h2_io *io, h2_io_op op); -void h2_io_signal_init(h2_io *io, h2_io_op op, int timeout_secs, +void h2_io_signal_init(h2_io *io, h2_io_op op, apr_interval_time_t timeout, struct apr_thread_cond_t *cond); void h2_io_signal_exit(h2_io *io); apr_status_t h2_io_signal_wait(struct h2_mplx *m, h2_io *io); Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_io_set.c URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_io_set.c?rev=1733259&r1=1733258&r2=1733259&view=diff ============================================================================== --- httpd/httpd/branches/2.4.x/modules/http2/h2_io_set.c (original) +++ httpd/httpd/branches/2.4.x/modules/http2/h2_io_set.c Wed Mar 2 11:21:28 2016 @@ -45,16 +45,6 @@ h2_io_set *h2_io_set_create(apr_pool_t * return sp; } -void h2_io_set_destroy(h2_io_set *sp) -{ - int i; - for (i = 0; i < sp->list->nelts; ++i) { - h2_io *io = h2_io_IDX(sp->list, i); - h2_io_destroy(io); - } - sp->list->nelts = 0; -} - static int h2_stream_id_cmp(const void *s1, const void *s2) { h2_io **pio1 = (h2_io **)s1; @@ -91,7 +81,7 @@ apr_status_t h2_io_set_add(h2_io_set *sp int last; APR_ARRAY_PUSH(sp->list, h2_io*) = io; /* Normally, streams get added in ascending order if id. We - * keep the array sorted, so we just need to check of the newly + * keep the array sorted, so we just need to check if the newly * appended stream has a lower id than the last one. if not, * sorting is not necessary. */ @@ -111,9 +101,7 @@ static void remove_idx(h2_io_set *sp, in --sp->list->nelts; n = sp->list->nelts - idx; if (n > 0) { - /* Close the hole in the array by moving the upper - * parts down one step. - */ + /* There are n h2_io* behind idx. Move the rest down */ h2_io **selts = (h2_io**)sp->list->elts; memmove(selts + idx, selts + idx + 1, n * sizeof(h2_io*)); } @@ -124,7 +112,7 @@ h2_io *h2_io_set_remove(h2_io_set *sp, h int i; for (i = 0; i < sp->list->nelts; ++i) { h2_io *e = h2_io_IDX(sp->list, i); - if (e == io) { + if (e->id == io->id) { remove_idx(sp, i); return e; } @@ -132,7 +120,7 @@ h2_io *h2_io_set_remove(h2_io_set *sp, h return NULL; } -h2_io *h2_io_set_pop_highest_prio(h2_io_set *set) +h2_io *h2_io_set_shift(h2_io_set *set) { /* For now, this just removes the first element in the set. * the name is misleading... Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_io_set.h URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_io_set.h?rev=1733259&r1=1733258&r2=1733259&view=diff ============================================================================== --- httpd/httpd/branches/2.4.x/modules/http2/h2_io_set.h (original) +++ httpd/httpd/branches/2.4.x/modules/http2/h2_io_set.h Wed Mar 2 11:21:28 2016 @@ -26,8 +26,6 @@ typedef struct h2_io_set h2_io_set; h2_io_set *h2_io_set_create(apr_pool_t *pool); -void h2_io_set_destroy(h2_io_set *set); - apr_status_t h2_io_set_add(h2_io_set *set, struct h2_io *io); h2_io *h2_io_set_get(h2_io_set *set, int stream_id); h2_io *h2_io_set_remove(h2_io_set *set, struct h2_io *io); @@ -48,9 +46,8 @@ typedef int h2_io_set_iter_fn(void *ctx, * @param ctx user data for the callback * @return 1 iff iteration completed for all members */ -int h2_io_set_iter(h2_io_set *set, - h2_io_set_iter_fn *iter, void *ctx); +int h2_io_set_iter(h2_io_set *set, h2_io_set_iter_fn *iter, void *ctx); -h2_io *h2_io_set_pop_highest_prio(h2_io_set *set); +h2_io *h2_io_set_shift(h2_io_set *set); #endif /* defined(__mod_h2__h2_io_set__) */ Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c?rev=1733259&r1=1733258&r2=1733259&view=diff ============================================================================== --- httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c (original) +++ httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c Wed Mar 2 11:21:28 2016 @@ -17,7 +17,6 @@ #include #include -#include #include #include #include @@ -27,21 +26,23 @@ #include #include +#include "mod_http2.h" + #include "h2_private.h" #include "h2_config.h" #include "h2_conn.h" +#include "h2_ctx.h" #include "h2_h2.h" +#include "h2_int_queue.h" #include "h2_io.h" #include "h2_io_set.h" #include "h2_response.h" #include "h2_mplx.h" #include "h2_request.h" #include "h2_stream.h" -#include "h2_stream_set.h" #include "h2_task.h" #include "h2_task_input.h" #include "h2_task_output.h" -#include "h2_task_queue.h" #include "h2_worker.h" #include "h2_workers.h" #include "h2_util.h" @@ -60,6 +61,48 @@ } while(0) +/* NULL or the mutex hold by this thread, used for recursive calls + */ +static apr_threadkey_t *thread_lock; + +apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s) +{ + return apr_threadkey_private_create(&thread_lock, NULL, pool); +} + +static apr_status_t enter_mutex(h2_mplx *m, int *pacquired) +{ + apr_status_t status; + void *mutex = NULL; + + /* Enter the mutex if this thread already holds the lock or + * if we can acquire it. Only on the later case do we unlock + * onleaving the mutex. + * This allow recursive entering of the mutex from the saem thread, + * which is what we need in certain situations involving callbacks + */ + apr_threadkey_private_get(&mutex, thread_lock); + if (mutex == m->lock) { + *pacquired = 0; + return APR_SUCCESS; + } + + status = apr_thread_mutex_lock(m->lock); + *pacquired = (status == APR_SUCCESS); + if (*pacquired) { + apr_threadkey_private_set(m->lock, thread_lock); + } + return status; +} + +static void leave_mutex(h2_mplx *m, int acquired) +{ + if (acquired) { + apr_threadkey_private_set(NULL, thread_lock); + apr_thread_mutex_unlock(m->lock); + } +} + static int is_aborted(h2_mplx *m, apr_status_t *pstatus) { AP_DEBUG_ASSERT(m); @@ -101,14 +144,6 @@ static void h2_mplx_destroy(h2_mplx *m) "h2_mplx(%ld): destroy, ios=%d", m->id, (int)h2_io_set_size(m->stream_ios)); m->aborted = 1; - if (m->ready_ios) { - h2_io_set_destroy(m->ready_ios); - m->ready_ios = NULL; - } - if (m->stream_ios) { - h2_io_set_destroy(m->stream_ios); - m->stream_ios = NULL; - } check_tx_free(m); @@ -129,7 +164,8 @@ static void h2_mplx_destroy(h2_mplx *m) * than protecting a shared h2_session one with an own lock. */ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, - const h2_config *conf, + const h2_config *conf, + apr_interval_time_t stream_timeout, h2_workers *workers) { apr_status_t status = APR_SUCCESS; @@ -151,6 +187,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr if (!m->pool) { return NULL; } + apr_pool_tag(m->pool, "h2_mplx"); apr_allocator_owner_set(allocator, m->pool); status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT, @@ -160,16 +197,26 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr return NULL; } - m->q = h2_tq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS)); + status = apr_thread_cond_create(&m->task_done, m->pool); + if (status != APR_SUCCESS) { + h2_mplx_destroy(m); + return NULL; + } + + m->q = h2_iq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS)); m->stream_ios = h2_io_set_create(m->pool); m->ready_ios = h2_io_set_create(m->pool); m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM); + m->stream_timeout = stream_timeout; m->workers = workers; + m->workers_max = h2_config_geti(conf, H2_CONF_MAX_WORKERS); + m->workers_def_limit = 4; + m->workers_limit = m->workers_def_limit; + m->last_limit_change = m->last_idle_block = apr_time_now(); + m->limit_change_interval = apr_time_from_msec(200); m->tx_handles_reserved = 0; m->tx_chunk_size = 4; - - m->stream_timeout_secs = h2_config_geti(conf, H2_CONF_STREAM_TIMEOUT_SECS); } return m; } @@ -177,10 +224,11 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr int h2_mplx_get_max_stream_started(h2_mplx *m) { int stream_id = 0; + int acquired; - apr_thread_mutex_lock(m->lock); + enter_mutex(m, &acquired); stream_id = m->max_stream_started; - apr_thread_mutex_unlock(m->lock); + leave_mutex(m, acquired); return stream_id; } @@ -198,6 +246,7 @@ static void workers_register(h2_mplx *m) * Therefore: ref counting for h2_workers in not needed, ref counting * for h2_worker using this is critical. */ + m->need_registration = 0; h2_workers_register(m->workers, m); } @@ -231,7 +280,9 @@ static void io_destroy(h2_mplx *m, h2_io h2_io_set_remove(m->stream_ios, io); h2_io_set_remove(m->ready_ios, io); - h2_io_destroy(io); + if (m->redo_ios) { + h2_io_set_remove(m->redo_ios, io); + } if (pool) { apr_pool_clear(pool); @@ -250,7 +301,7 @@ static int io_stream_done(h2_mplx *m, h2 h2_io_set_remove(m->ready_ios, io); if (!io->worker_started || io->worker_done) { /* already finished or not even started yet */ - h2_tq_remove(m->q, io->id); + h2_iq_remove(m->q, io->id); io_destroy(m, io, 1); return 0; } @@ -266,27 +317,64 @@ static int stream_done_iter(void *ctx, h return io_stream_done((h2_mplx*)ctx, io, 0); } +static int stream_print(void *ctx, h2_io *io) +{ + h2_mplx *m = ctx; + if (io && io->request) { + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ + "->03198: h2_stream(%ld-%d): %s %s %s -> %s %d" + "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]", + m->id, io->id, + io->request->method, io->request->authority, io->request->path, + io->response? "http" : (io->rst_error? "reset" : "?"), + io->response? io->response->http_status : io->rst_error, + io->orphaned, io->worker_started, io->worker_done, + io->eos_in, io->eos_out); + } + else if (io) { + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ + "->03198: h2_stream(%ld-%d): NULL -> %s %d" + "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]", + m->id, io->id, + io->response? "http" : (io->rst_error? "reset" : "?"), + io->response? io->response->http_status : io->rst_error, + io->orphaned, io->worker_started, io->worker_done, + io->eos_in, io->eos_out); + } + else { + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ + "->03198: h2_stream(%ld-NULL): NULL", m->id); + } + return 1; +} + apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) { apr_status_t status; - + int acquired; + h2_workers_unregister(m->workers, m); - status = apr_thread_mutex_lock(m->lock); - if (APR_SUCCESS == status) { + + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { int i, wait_secs = 5; /* disable WINDOW_UPDATE callbacks */ h2_mplx_set_consumed_cb(m, NULL, NULL); + h2_iq_clear(m->q); + apr_thread_cond_broadcast(m->task_done); while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) { /* iterate until all ios have been orphaned or destroyed */ } - /* Any remaining ios have handed out requests to workers that are - * not done yet. Any operation they do on their assigned stream ios will - * be errored ECONNRESET/ABORTED, so that should find out pretty soon. + /* If we still have busy workers, we cannot release our memory + * pool yet, as slave connections have child pools of their respective + * h2_io's. + * Any remaining ios are processed in these workers. Any operation + * they do on their input/outputs will be errored ECONNRESET/ABORTED, + * so processing them should fail and workers *should* return. */ - for (i = 0; h2_io_set_size(m->stream_ios) > 0; ++i) { + for (i = 0; m->workers_busy > 0; ++i) { m->join_wait = wait; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): release_join, waiting on %d worker to report back", @@ -302,14 +390,20 @@ apr_status_t h2_mplx_release_and_join(h2 */ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03198) "h2_mplx(%ld): release, waiting for %d seconds now for " - "all h2_workers to return, have still %d requests outstanding", - m->id, i*wait_secs, (int)h2_io_set_size(m->stream_ios)); + "%d h2_workers to return, have still %d requests outstanding", + m->id, i*wait_secs, m->workers_busy, + (int)h2_io_set_size(m->stream_ios)); + if (i == 1) { + h2_io_set_iter(m->stream_ios, stream_print, m); + } } + m->aborted = 1; + apr_thread_cond_broadcast(m->task_done); } } ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056) "h2_mplx(%ld): release_join -> destroy", m->id); - apr_thread_mutex_unlock(m->lock); + leave_mutex(m, acquired); h2_mplx_destroy(m); /* all gone */ } @@ -319,88 +413,43 @@ apr_status_t h2_mplx_release_and_join(h2 void h2_mplx_abort(h2_mplx *m) { apr_status_t status; + int acquired; AP_DEBUG_ASSERT(m); if (!m->aborted) { - status = apr_thread_mutex_lock(m->lock); - if (APR_SUCCESS == status) { + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { m->aborted = 1; - apr_thread_mutex_unlock(m->lock); + leave_mutex(m, acquired); } } } apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error) { - apr_status_t status; + apr_status_t status = APR_SUCCESS; + int acquired; + /* This maybe called from inside callbacks that already hold the lock. + * E.g. when we are streaming out DATA and the EOF triggers the stream + * release. + */ AP_DEBUG_ASSERT(m); - status = apr_thread_mutex_lock(m->lock); - if (APR_SUCCESS == status) { + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { h2_io *io = h2_io_set_get(m->stream_ios, stream_id); /* there should be an h2_io, once the stream has been scheduled * for processing, e.g. when we received all HEADERs. But when * a stream is cancelled very early, it will not exist. */ if (io) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld-%d): marking stream as done.", + m->id, stream_id); io_stream_done(m, io, rst_error); } - - apr_thread_mutex_unlock(m->lock); - } - return status; -} -static const h2_request *pop_request(h2_mplx *m) -{ - const h2_request *req = NULL; - int sid; - while (!m->aborted && !req && (sid = h2_tq_shift(m->q)) > 0) { - h2_io *io = h2_io_set_get(m->stream_ios, sid); - if (io) { - req = io->request; - io->worker_started = 1; - if (sid > m->max_stream_started) { - m->max_stream_started = sid; - } - } - } - return req; -} - -void h2_mplx_request_done(h2_mplx **pm, int stream_id, const h2_request **preq) -{ - h2_mplx *m = *pm; - - apr_status_t status = apr_thread_mutex_lock(m->lock); - if (APR_SUCCESS == status) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld): request(%d) done", m->id, stream_id); - if (io) { - io->worker_done = 1; - if (io->orphaned) { - io_destroy(m, io, 0); - if (m->join_wait) { - apr_thread_cond_signal(m->join_wait); - } - } - else { - /* hang around until the stream deregisteres */ - } - } - - if (preq) { - /* someone wants another request, if we have */ - *preq = pop_request(m); - } - if (!preq || !*preq) { - /* No request to hand back to the worker, NULLify reference - * and decrement count */ - *pm = NULL; - } - apr_thread_mutex_unlock(m->lock); + leave_mutex(m, acquired); } + return status; } apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block, @@ -409,14 +458,15 @@ apr_status_t h2_mplx_in_read(h2_mplx *m, struct apr_thread_cond_t *iowait) { apr_status_t status; + int acquired; + AP_DEBUG_ASSERT(m); - status = apr_thread_mutex_lock(m->lock); - if (APR_SUCCESS == status) { + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io && !io->orphaned) { H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_pre"); - h2_io_signal_init(io, H2_IO_READ, m->stream_timeout_secs, iowait); + h2_io_signal_init(io, H2_IO_READ, m->stream_timeout, iowait); status = h2_io_in_read(io, bb, -1, trailers); while (APR_STATUS_IS_EAGAIN(status) && !is_aborted(m, &status) @@ -435,7 +485,7 @@ apr_status_t h2_mplx_in_read(h2_mplx *m, else { status = APR_EOF; } - apr_thread_mutex_unlock(m->lock); + leave_mutex(m, acquired); } return status; } @@ -444,9 +494,10 @@ apr_status_t h2_mplx_in_write(h2_mplx *m apr_bucket_brigade *bb) { apr_status_t status; + int acquired; + AP_DEBUG_ASSERT(m); - status = apr_thread_mutex_lock(m->lock); - if (APR_SUCCESS == status) { + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io && !io->orphaned) { H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_pre"); @@ -458,7 +509,7 @@ apr_status_t h2_mplx_in_write(h2_mplx *m else { status = APR_ECONNABORTED; } - apr_thread_mutex_unlock(m->lock); + leave_mutex(m, acquired); } return status; } @@ -466,9 +517,10 @@ apr_status_t h2_mplx_in_write(h2_mplx *m apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id) { apr_status_t status; + int acquired; + AP_DEBUG_ASSERT(m); - status = apr_thread_mutex_lock(m->lock); - if (APR_SUCCESS == status) { + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io && !io->orphaned) { status = h2_io_in_close(io); @@ -479,7 +531,7 @@ apr_status_t h2_mplx_in_close(h2_mplx *m else { status = APR_ECONNABORTED; } - apr_thread_mutex_unlock(m->lock); + leave_mutex(m, acquired); } return status; } @@ -507,12 +559,13 @@ void h2_mplx_set_consumed_cb(h2_mplx *m, apr_status_t h2_mplx_in_update_windows(h2_mplx *m) { apr_status_t status; + int acquired; + AP_DEBUG_ASSERT(m); if (m->aborted) { return APR_ECONNABORTED; } - status = apr_thread_mutex_lock(m->lock); - if (APR_SUCCESS == status) { + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { update_ctx ctx; ctx.m = m; @@ -524,7 +577,7 @@ apr_status_t h2_mplx_in_update_windows(h if (ctx.streams_updated) { status = APR_SUCCESS; } - apr_thread_mutex_unlock(m->lock); + leave_mutex(m, acquired); } return status; } @@ -535,9 +588,10 @@ apr_status_t h2_mplx_out_readx(h2_mplx * apr_table_t **ptrailers) { apr_status_t status; + int acquired; + AP_DEBUG_ASSERT(m); - status = apr_thread_mutex_lock(m->lock); - if (APR_SUCCESS == status) { + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io && !io->orphaned) { H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_readx_pre"); @@ -553,7 +607,7 @@ apr_status_t h2_mplx_out_readx(h2_mplx * } *ptrailers = (*peos && io->response)? io->response->trailers : NULL; - apr_thread_mutex_unlock(m->lock); + leave_mutex(m, acquired); } return status; } @@ -564,9 +618,10 @@ apr_status_t h2_mplx_out_read_to(h2_mplx apr_table_t **ptrailers) { apr_status_t status; + int acquired; + AP_DEBUG_ASSERT(m); - status = apr_thread_mutex_lock(m->lock); - if (APR_SUCCESS == status) { + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io && !io->orphaned) { H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_read_to_pre"); @@ -582,23 +637,24 @@ apr_status_t h2_mplx_out_read_to(h2_mplx status = APR_ECONNABORTED; } *ptrailers = (*peos && io->response)? io->response->trailers : NULL; - apr_thread_mutex_unlock(m->lock); + leave_mutex(m, acquired); } return status; } -h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_stream_set *streams) +h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams) { apr_status_t status; h2_stream *stream = NULL; + int acquired; AP_DEBUG_ASSERT(m); - status = apr_thread_mutex_lock(m->lock); - if (APR_SUCCESS == status) { - h2_io *io = h2_io_set_pop_highest_prio(m->ready_ios); + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { + h2_io *io = h2_io_set_shift(m->ready_ios); if (io && !m->aborted) { - stream = h2_stream_set_get(streams, io->id); + stream = h2_ihash_get(streams, io->id); if (stream) { + io->submitted = 1; if (io->rst_error) { h2_stream_rst(stream, io->rst_error); } @@ -614,7 +670,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx * * reset by the client. Should no longer happen since such * streams should clear io's from the ready queue. */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03347) "h2_mplx(%ld): stream for response %d closed, " "resetting io to close request processing", m->id, io->id); @@ -633,7 +689,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx * h2_io_signal(io, H2_IO_WRITE); } - apr_thread_mutex_unlock(m->lock); + leave_mutex(m, acquired); } return stream; } @@ -657,7 +713,7 @@ static apr_status_t out_write(h2_mplx *m &m->tx_handles_reserved); /* Wait for data to drain until there is room again or * stream timeout expires */ - h2_io_signal_init(io, H2_IO_WRITE, m->stream_timeout_secs, iowait); + h2_io_signal_init(io, H2_IO_WRITE, m->stream_timeout, iowait); while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb) && iowait @@ -716,9 +772,10 @@ apr_status_t h2_mplx_out_open(h2_mplx *m struct apr_thread_cond_t *iowait) { apr_status_t status; + int acquired; + AP_DEBUG_ASSERT(m); - status = apr_thread_mutex_lock(m->lock); - if (APR_SUCCESS == status) { + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { if (m->aborted) { status = APR_ECONNABORTED; } @@ -728,7 +785,7 @@ apr_status_t h2_mplx_out_open(h2_mplx *m h2_util_bb_log(m->c, stream_id, APLOG_TRACE1, "h2_mplx_out_open", bb); } } - apr_thread_mutex_unlock(m->lock); + leave_mutex(m, acquired); } return status; } @@ -739,13 +796,14 @@ apr_status_t h2_mplx_out_write(h2_mplx * struct apr_thread_cond_t *iowait) { apr_status_t status; + int acquired; + AP_DEBUG_ASSERT(m); - status = apr_thread_mutex_lock(m->lock); - if (APR_SUCCESS == status) { + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io && !io->orphaned) { status = out_write(m, io, f, bb, trailers, iowait); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, "h2_mplx(%ld-%d): write with trailers=%s", m->id, io->id, trailers? "yes" : "no"); H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_write"); @@ -755,7 +813,7 @@ apr_status_t h2_mplx_out_write(h2_mplx * else { status = APR_ECONNABORTED; } - apr_thread_mutex_unlock(m->lock); + leave_mutex(m, acquired); } return status; } @@ -763,9 +821,10 @@ apr_status_t h2_mplx_out_write(h2_mplx * apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id, apr_table_t *trailers) { apr_status_t status; + int acquired; + AP_DEBUG_ASSERT(m); - status = apr_thread_mutex_lock(m->lock); - if (APR_SUCCESS == status) { + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io && !io->orphaned) { if (!io->response && !io->rst_error) { @@ -791,7 +850,7 @@ apr_status_t h2_mplx_out_close(h2_mplx * else { status = APR_ECONNABORTED; } - apr_thread_mutex_unlock(m->lock); + leave_mutex(m, acquired); } return status; } @@ -799,9 +858,10 @@ apr_status_t h2_mplx_out_close(h2_mplx * apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error) { apr_status_t status; + int acquired; + AP_DEBUG_ASSERT(m); - status = apr_thread_mutex_lock(m->lock); - if (APR_SUCCESS == status) { + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io && !io->rst_error && !io->orphaned) { h2_io_rst(io, error); @@ -816,7 +876,7 @@ apr_status_t h2_mplx_out_rst(h2_mplx *m, else { status = APR_ECONNABORTED; } - apr_thread_mutex_unlock(m->lock); + leave_mutex(m, acquired); } return status; } @@ -824,10 +884,11 @@ apr_status_t h2_mplx_out_rst(h2_mplx *m, int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id) { int has_eos = 0; + int acquired; + apr_status_t status; AP_DEBUG_ASSERT(m); - status = apr_thread_mutex_lock(m->lock); - if (APR_SUCCESS == status) { + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io && !io->orphaned) { has_eos = h2_io_in_has_eos_for(io); @@ -835,18 +896,39 @@ int h2_mplx_in_has_eos_for(h2_mplx *m, i else { has_eos = 1; } - apr_thread_mutex_unlock(m->lock); + leave_mutex(m, acquired); } return has_eos; } +int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id) +{ + apr_status_t status; + int has_data = 0; + int acquired; + + AP_DEBUG_ASSERT(m); + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { + h2_io *io = h2_io_set_get(m->stream_ios, stream_id); + if (io && !io->orphaned) { + has_data = h2_io_in_has_data(io); + } + else { + has_data = 0; + } + leave_mutex(m, acquired); + } + return has_data; +} + int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id) { apr_status_t status; int has_data = 0; + int acquired; + AP_DEBUG_ASSERT(m); - status = apr_thread_mutex_lock(m->lock); - if (APR_SUCCESS == status) { + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io && !io->orphaned) { has_data = h2_io_out_has_data(io); @@ -854,7 +936,7 @@ int h2_mplx_out_has_data_for(h2_mplx *m, else { has_data = 0; } - apr_thread_mutex_unlock(m->lock); + leave_mutex(m, acquired); } return has_data; } @@ -863,9 +945,10 @@ apr_status_t h2_mplx_out_trywait(h2_mplx apr_thread_cond_t *iowait) { apr_status_t status; + int acquired; + AP_DEBUG_ASSERT(m); - status = apr_thread_mutex_lock(m->lock); - if (APR_SUCCESS == status) { + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { if (m->aborted) { status = APR_ECONNABORTED; } @@ -879,7 +962,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx } m->added_output = NULL; } - apr_thread_mutex_unlock(m->lock); + leave_mutex(m, acquired); } return status; } @@ -896,37 +979,38 @@ static void have_out_data_for(h2_mplx *m apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx) { apr_status_t status; + int acquired; AP_DEBUG_ASSERT(m); - status = apr_thread_mutex_lock(m->lock); - if (APR_SUCCESS == status) { + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { if (m->aborted) { status = APR_ECONNABORTED; } else { - h2_tq_sort(m->q, cmp, ctx); + h2_iq_sort(m->q, cmp, ctx); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): reprioritize tasks", m->id); } - apr_thread_mutex_unlock(m->lock); + leave_mutex(m, acquired); } return status; } -static h2_io *open_io(h2_mplx *m, int stream_id) +static h2_io *open_io(h2_mplx *m, int stream_id, const h2_request *request) { apr_pool_t *io_pool = m->spare_pool; h2_io *io; if (!io_pool) { apr_pool_create(&io_pool, m->pool); + apr_pool_tag(io_pool, "h2_io"); } else { m->spare_pool = NULL; } - io = h2_io_create(stream_id, io_pool); + io = h2_io_create(stream_id, io_pool, request); h2_io_set_add(m->stream_ios, io); return io; @@ -937,55 +1021,615 @@ apr_status_t h2_mplx_process(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx) { apr_status_t status; - int was_empty = 0; + int do_registration = 0; + int acquired; AP_DEBUG_ASSERT(m); - status = apr_thread_mutex_lock(m->lock); - if (APR_SUCCESS == status) { + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { if (m->aborted) { status = APR_ECONNABORTED; } else { - h2_io *io = open_io(m, stream_id); - io->request = req; + h2_io *io = open_io(m, stream_id, req); if (!io->request->body) { status = h2_io_in_close(io); } - was_empty = h2_tq_empty(m->q); - h2_tq_add(m->q, io->id, cmp, ctx); + m->need_registration = m->need_registration || h2_iq_empty(m->q); + do_registration = (m->need_registration && m->workers_busy < m->workers_max); + h2_iq_add(m->q, io->id, cmp, ctx); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, "h2_mplx(%ld-%d): process", m->c->id, stream_id); H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_process"); } - apr_thread_mutex_unlock(m->lock); + leave_mutex(m, acquired); } - if (status == APR_SUCCESS && was_empty) { + if (status == APR_SUCCESS && do_registration) { workers_register(m); } return status; } -const h2_request *h2_mplx_pop_request(h2_mplx *m, int *has_more) +static h2_task *pop_task(h2_mplx *m) { - const h2_request *req = NULL; + h2_task *task = NULL; + int sid; + while (!m->aborted && !task + && (m->workers_busy < m->workers_limit) + && (sid = h2_iq_shift(m->q)) > 0) { + h2_io *io = h2_io_set_get(m->stream_ios, sid); + if (io && io->orphaned) { + io_destroy(m, io, 0); + if (m->join_wait) { + apr_thread_cond_signal(m->join_wait); + } + } + else if (io) { + conn_rec *slave = h2_slave_create(m->c, m->pool, m->spare_allocator); + m->spare_allocator = NULL; + task = h2_task_create(m->id, io->request, slave, m); + io->worker_started = 1; + io->started_at = apr_time_now(); + if (sid > m->max_stream_started) { + m->max_stream_started = sid; + } + ++m->workers_busy; + } + } + return task; +} + +h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more) +{ + h2_task *task = NULL; apr_status_t status; + int acquired; AP_DEBUG_ASSERT(m); - status = apr_thread_mutex_lock(m->lock); - if (APR_SUCCESS == status) { + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { if (m->aborted) { - req = NULL; *has_more = 0; } else { - req = pop_request(m); - *has_more = !h2_tq_empty(m->q); + task = pop_task(m); + *has_more = !h2_iq_empty(m->q); + } + + if (has_more && !task) { + m->need_registration = 1; + } + leave_mutex(m, acquired); + } + return task; +} + +static void task_done(h2_mplx *m, h2_task *task) +{ + if (task) { + if (task->frozen) { + /* this task was handed over to an engine for processing */ + h2_task_thaw(task); + /* TODO: can we signal an engine that it can now start on this? */ + } + else { + h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id); + + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): task(%s) done", m->id, task->id); + /* clean our references and report request as done. Signal + * that we want another unless we have been aborted */ + /* TODO: this will keep a worker attached to this h2_mplx as + * long as it has requests to handle. Might no be fair to + * other mplx's. Perhaps leave after n requests? */ + h2_mplx_out_close(m, task->stream_id, NULL); + if (m->spare_allocator) { + apr_allocator_destroy(m->spare_allocator); + m->spare_allocator = NULL; + } + h2_slave_destroy(task->c, &m->spare_allocator); + task = NULL; + if (io) { + apr_time_t now = apr_time_now(); + if (!io->orphaned && m->redo_ios + && h2_io_set_get(m->redo_ios, io->id)) { + /* reset and schedule again */ + h2_io_redo(io); + h2_io_set_remove(m->redo_ios, io); + h2_iq_add(m->q, io->id, NULL, NULL); + } + else { + io->worker_done = 1; + io->done_at = now; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): request(%d) done, %f ms" + " elapsed", m->id, io->id, + (io->done_at - io->started_at) / 1000.0); + if (io->started_at > m->last_idle_block) { + /* this task finished without causing an 'idle block', e.g. + * a block by flow control. + */ + if (now - m->last_limit_change >= m->limit_change_interval + && m->workers_limit < m->workers_max) { + /* Well behaving stream, allow it more workers */ + m->workers_limit = H2MIN(m->workers_limit * 2, + m->workers_max); + m->last_limit_change = now; + m->need_registration = 1; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): increase worker limit to %d", + m->id, m->workers_limit); + } + } + } + + if (io->orphaned) { + io_destroy(m, io, 0); + if (m->join_wait) { + apr_thread_cond_signal(m->join_wait); + } + } + else { + /* hang around until the stream deregisteres */ + } + } + apr_thread_cond_broadcast(m->task_done); + } + } +} + +void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) +{ + int acquired; + + if (enter_mutex(m, &acquired) == APR_SUCCESS) { + task_done(m, task); + --m->workers_busy; + if (ptask) { + /* caller wants another task */ + *ptask = pop_task(m); + } + leave_mutex(m, acquired); + } +} + +/******************************************************************************* + * h2_mplx DoS protection + ******************************************************************************/ + +typedef struct { + h2_mplx *m; + h2_io *io; + apr_time_t now; +} io_iter_ctx; + +static int latest_repeatable_busy_unsubmitted_iter(void *data, h2_io *io) +{ + io_iter_ctx *ctx = data; + if (io->worker_started && !io->worker_done + && h2_io_is_repeatable(io) + && !h2_io_set_get(ctx->m->redo_ios, io->id)) { + /* this io occupies a worker, the response has not been submitted yet, + * not been cancelled and it is a repeatable request + * -> it can be re-scheduled later */ + if (!ctx->io || ctx->io->started_at < io->started_at) { + /* we did not have one or this one was started later */ + ctx->io = io; + } + } + return 1; +} + +static h2_io *get_latest_repeatable_busy_unsubmitted_io(h2_mplx *m) +{ + io_iter_ctx ctx; + ctx.m = m; + ctx.io = NULL; + h2_io_set_iter(m->stream_ios, latest_repeatable_busy_unsubmitted_iter, &ctx); + return ctx.io; +} + +static int timed_out_busy_iter(void *data, h2_io *io) +{ + io_iter_ctx *ctx = data; + if (io->worker_started && !io->worker_done + && (ctx->now - io->started_at) > ctx->m->stream_timeout) { + /* timed out stream occupying a worker, found */ + ctx->io = io; + return 0; + } + return 1; +} +static h2_io *get_timed_out_busy_stream(h2_mplx *m) +{ + io_iter_ctx ctx; + ctx.m = m; + ctx.io = NULL; + ctx.now = apr_time_now(); + h2_io_set_iter(m->stream_ios, timed_out_busy_iter, &ctx); + return ctx.io; +} + +static apr_status_t unschedule_slow_ios(h2_mplx *m) +{ + h2_io *io; + int n; + + if (!m->redo_ios) { + m->redo_ios = h2_io_set_create(m->pool); + } + /* Try to get rid of streams that occupy workers. Look for safe requests + * that are repeatable. If none found, fail the connection. + */ + n = (m->workers_busy - m->workers_limit - h2_io_set_size(m->redo_ios)); + while (n > 0 && (io = get_latest_repeatable_busy_unsubmitted_io(m))) { + h2_io_set_add(m->redo_ios, io); + h2_io_rst(io, H2_ERR_CANCEL); + --n; + } + + if ((m->workers_busy - h2_io_set_size(m->redo_ios)) > m->workers_limit) { + io = get_timed_out_busy_stream(m); + if (io) { + /* Too many busy workers, unable to cancel enough streams + * and with a busy, timed out stream, we tell the client + * to go away... */ + return APR_TIMEUP; + } + } + return APR_SUCCESS; +} + +apr_status_t h2_mplx_idle(h2_mplx *m) +{ + apr_status_t status = APR_SUCCESS; + apr_time_t now; + int acquired; + + if (enter_mutex(m, &acquired) == APR_SUCCESS) { + apr_size_t scount = h2_io_set_size(m->stream_ios); + if (scount > 0 && m->workers_busy) { + /* If we have streams in connection state 'IDLE', meaning + * all streams are ready to sent data out, but lack + * WINDOW_UPDATEs. + * + * This is ok, unless we have streams that still occupy + * h2 workers. As worker threads are a scarce resource, + * we need to take measures that we do not get DoSed. + * + * This is what we call an 'idle block'. Limit the amount + * of busy workers we allow for this connection until it + * well behaves. + */ + now = apr_time_now(); + m->last_idle_block = now; + if (m->workers_limit > 2 + && now - m->last_limit_change >= m->limit_change_interval) { + if (m->workers_limit > 16) { + m->workers_limit = 16; + } + else if (m->workers_limit > 8) { + m->workers_limit = 8; + } + else if (m->workers_limit > 4) { + m->workers_limit = 4; + } + else if (m->workers_limit > 2) { + m->workers_limit = 2; + } + m->last_limit_change = now; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): decrease worker limit to %d", + m->id, m->workers_limit); + } + + if (m->workers_busy > m->workers_limit) { + status = unschedule_slow_ios(m); + } + } + leave_mutex(m, acquired); + } + return status; +} + +/******************************************************************************* + * HTTP/2 request engines + ******************************************************************************/ + +typedef struct h2_req_entry h2_req_entry; +struct h2_req_entry { + APR_RING_ENTRY(h2_req_entry) link; + request_rec *r; +}; + +#define H2_REQ_ENTRY_NEXT(e) APR_RING_NEXT((e), link) +#define H2_REQ_ENTRY_PREV(e) APR_RING_PREV((e), link) +#define H2_REQ_ENTRY_REMOVE(e) APR_RING_REMOVE((e), link) + +typedef struct h2_req_engine_i h2_req_engine_i; +struct h2_req_engine_i { + h2_req_engine pub; + conn_rec *c; /* connection this engine is assigned to */ + h2_mplx *m; + unsigned int shutdown : 1; /* engine is being shut down */ + apr_thread_cond_t *io; /* condition var for waiting on data */ + APR_RING_HEAD(h2_req_entries, h2_req_entry) entries; + apr_size_t no_assigned; /* # of assigned requests */ + apr_size_t no_live; /* # of live */ + apr_size_t no_finished; /* # of finished */ +}; + +#define H2_REQ_ENTRIES_SENTINEL(b) APR_RING_SENTINEL((b), h2_req_entry, link) +#define H2_REQ_ENTRIES_EMPTY(b) APR_RING_EMPTY((b), h2_req_entry, link) +#define H2_REQ_ENTRIES_FIRST(b) APR_RING_FIRST(b) +#define H2_REQ_ENTRIES_LAST(b) APR_RING_LAST(b) + +#define H2_REQ_ENTRIES_INSERT_HEAD(b, e) do { \ +h2_req_entry *ap__b = (e); \ +APR_RING_INSERT_HEAD((b), ap__b, h2_req_entry, link); \ +} while (0) + +#define H2_REQ_ENTRIES_INSERT_TAIL(b, e) do { \ +h2_req_entry *ap__b = (e); \ +APR_RING_INSERT_TAIL((b), ap__b, h2_req_entry, link); \ +} while (0) + +static apr_status_t h2_mplx_engine_schedule(h2_mplx *m, + h2_req_engine_i *engine, + request_rec *r) +{ + h2_req_entry *entry = apr_pcalloc(r->pool, sizeof(*entry)); + + APR_RING_ELEM_INIT(entry, link); + entry->r = r; + H2_REQ_ENTRIES_INSERT_TAIL(&engine->entries, entry); + return APR_SUCCESS; +} + + +apr_status_t h2_mplx_engine_push(const char *engine_type, + request_rec *r, h2_mplx_engine_init *einit) +{ + apr_status_t status; + h2_mplx *m; + h2_task *task; + int acquired; + + task = h2_ctx_rget_task(r); + if (!task) { + return APR_ECONNABORTED; + } + m = task->mplx; + AP_DEBUG_ASSERT(m); + + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { + h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id); + if (!io || io->orphaned) { + status = APR_ECONNABORTED; + } + else { + h2_req_engine_i *engine = (h2_req_engine_i*)m->engine; + + apr_table_set(r->connection->notes, H2_TASK_ID_NOTE, task->id); + status = APR_EOF; + + if (task->ser_headers) { + /* Max compatibility, deny processing of this */ + } + else if (engine && !strcmp(engine->pub.type, engine_type)) { + if (engine->shutdown + || engine->no_assigned >= H2MIN(engine->pub.capacity, 100)) { + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, + "h2_mplx(%ld): engine shutdown or over %s", + m->c->id, engine->pub.id); + engine = NULL; + } + else if (h2_mplx_engine_schedule(m, engine, r) == APR_SUCCESS) { + /* this task will be processed in another thread, + * freeze any I/O for the time being. */ + h2_task_freeze(task, r); + engine->no_assigned++; + status = APR_SUCCESS; + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, + "h2_mplx(%ld): push request %s", + m->c->id, r->the_request); + } + else { + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, + "h2_mplx(%ld): engine error adding req %s", + m->c->id, engine->pub.id); + engine = NULL; + } + } + + if (!engine && einit) { + engine = apr_pcalloc(task->c->pool, sizeof(*engine)); + engine->pub.id = apr_psprintf(task->c->pool, "eng-%ld-%d", + m->id, m->next_eng_id++); + engine->pub.pool = task->c->pool; + engine->pub.type = apr_pstrdup(task->c->pool, engine_type); + engine->pub.window_bits = 30; + engine->pub.req_window_bits = h2_log2(m->stream_max_mem); + engine->c = r->connection; + APR_RING_INIT(&engine->entries, h2_req_entry, link); + engine->m = m; + engine->io = task->io; + engine->no_assigned = 1; + engine->no_live = 1; + + status = einit(&engine->pub, r); + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, + "h2_mplx(%ld): init engine %s (%s)", + m->c->id, engine->pub.id, engine->pub.type); + if (status == APR_SUCCESS) { + m->engine = &engine->pub; + } + } + } + + leave_mutex(m, acquired); + } + return status; +} + +static h2_req_entry *pop_non_frozen(h2_req_engine_i *engine) +{ + h2_req_entry *entry; + h2_task *task; + + for (entry = H2_REQ_ENTRIES_FIRST(&engine->entries); + entry != H2_REQ_ENTRIES_SENTINEL(&engine->entries); + entry = H2_REQ_ENTRY_NEXT(entry)) { + task = h2_ctx_rget_task(entry->r); + AP_DEBUG_ASSERT(task); + if (!task->frozen) { + H2_REQ_ENTRY_REMOVE(entry); + return entry; } - apr_thread_mutex_unlock(m->lock); } - return req; + return NULL; } +static apr_status_t engine_pull(h2_mplx *m, h2_req_engine_i *engine, + apr_read_type_e block, request_rec **pr) +{ + h2_req_entry *entry; + + AP_DEBUG_ASSERT(m); + AP_DEBUG_ASSERT(engine); + while (1) { + if (m->aborted) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): mplx abort while pulling requests %s", + m->id, engine->pub.id); + *pr = NULL; + return APR_EOF; + } + + if (!H2_REQ_ENTRIES_EMPTY(&engine->entries) + && (entry = pop_non_frozen(engine))) { + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, entry->r, + "h2_mplx(%ld): request %s pulled by engine %s", + m->c->id, entry->r->the_request, engine->pub.id); + engine->no_live++; + entry->r->connection->current_thread = engine->c->current_thread; + *pr = entry->r; + return APR_SUCCESS; + } + else if (APR_NONBLOCK_READ == block) { + *pr = NULL; + return APR_EAGAIN; + } + else if (H2_REQ_ENTRIES_EMPTY(&engine->entries)) { + engine->shutdown = 1; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): emtpy queue, shutdown engine %s", + m->id, engine->pub.id); + *pr = NULL; + return APR_EOF; + } + apr_thread_cond_timedwait(m->task_done, m->lock, + apr_time_from_msec(100)); + } +} + +apr_status_t h2_mplx_engine_pull(h2_req_engine *pub_engine, + apr_read_type_e block, request_rec **pr) +{ + h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine; + h2_mplx *m = engine->m; + apr_status_t status; + int acquired; + + *pr = NULL; + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { + status = engine_pull(m, engine, block, pr); + leave_mutex(m, acquired); + } + return status; +} + +static void engine_done(h2_mplx *m, h2_req_engine_i *engine, h2_task *task, + int waslive, int aborted) +{ + int acquired; + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, + "h2_mplx(%ld): task %s %s by %s", + m->id, task->id, aborted? "aborted":"done", + engine->pub.id); + h2_task_output_close(task->output); + engine->no_finished++; + if (waslive) engine->no_live--; + engine->no_assigned--; + if (task->c != engine->c) { /* do not release what the engine runs on */ + if (enter_mutex(m, &acquired) == APR_SUCCESS) { + task_done(m, task); + leave_mutex(m, acquired); + } + } +} + +void h2_mplx_engine_done(h2_req_engine *pub_engine, conn_rec *r_conn) +{ + h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine; + h2_mplx *m = engine->m; + h2_task *task; + int acquired; + + task = h2_ctx_cget_task(r_conn); + if (task && (enter_mutex(m, &acquired) == APR_SUCCESS)) { + engine_done(m, engine, task, 1, 0); + leave_mutex(m, acquired); + } +} + +void h2_mplx_engine_exit(h2_req_engine *pub_engine) +{ + h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine; + h2_mplx *m = engine->m; + int acquired; + + if (enter_mutex(m, &acquired) == APR_SUCCESS) { + if (!m->aborted + && !H2_REQ_ENTRIES_EMPTY(&engine->entries)) { + h2_req_entry *entry; + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, + "h2_mplx(%ld): exit engine %s (%s), " + "has still requests queued, shutdown=%d," + "assigned=%ld, live=%ld, finished=%ld", + m->c->id, engine->pub.id, engine->pub.type, + engine->shutdown, + (long)engine->no_assigned, (long)engine->no_live, + (long)engine->no_finished); + for (entry = H2_REQ_ENTRIES_FIRST(&engine->entries); + entry != H2_REQ_ENTRIES_SENTINEL(&engine->entries); + entry = H2_REQ_ENTRY_NEXT(entry)) { + request_rec *r = entry->r; + h2_task *task = h2_ctx_rget_task(r); + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, + "h2_mplx(%ld): engine %s has queued task %s, " + "frozen=%d, aborting", + m->c->id, engine->pub.id, task->id, task->frozen); + engine_done(m, engine, task, 0, 1); + } + } + if (!m->aborted && (engine->no_assigned > 1 || engine->no_live > 1)) { + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, + "h2_mplx(%ld): exit engine %s (%s), " + "assigned=%ld, live=%ld, finished=%ld", + m->c->id, engine->pub.id, engine->pub.type, + (long)engine->no_assigned, (long)engine->no_live, + (long)engine->no_finished); + } + else { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): exit engine %s (%s)", + m->c->id, engine->pub.id, engine->pub.type); + } + if (m->engine == &engine->pub) { + m->engine = NULL; /* TODO */ + } + leave_mutex(m, acquired); + } +} Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h?rev=1733259&r1=1733258&r2=1733259&view=diff ============================================================================== --- httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h (original) +++ httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h Wed Mar 2 11:21:28 2016 @@ -38,6 +38,7 @@ struct apr_pool_t; struct apr_thread_mutex_t; struct apr_thread_cond_t; struct h2_config; +struct h2_ihash_t; struct h2_response; struct h2_task; struct h2_stream; @@ -45,9 +46,10 @@ struct h2_request; struct h2_io_set; struct apr_thread_cond_t; struct h2_workers; -struct h2_stream_set; -struct h2_task_queue; +struct h2_int_queue; +struct h2_req_engine; +#include #include "h2_io.h" typedef struct h2_mplx h2_mplx; @@ -66,27 +68,45 @@ struct h2_mplx { apr_pool_t *pool; unsigned int aborted : 1; + unsigned int need_registration : 1; - struct h2_task_queue *q; + struct h2_int_queue *q; struct h2_io_set *stream_ios; struct h2_io_set *ready_ios; + struct h2_io_set *redo_ios; int max_stream_started; /* highest stream id that started processing */ + int workers_busy; /* # of workers processing on this mplx */ + int workers_limit; /* current # of workers limit, dynamic */ + int workers_def_limit; /* default # of workers limit */ + int workers_max; /* max, hard limit # of workers in a process */ + apr_time_t last_idle_block; /* last time, this mplx entered IDLE while + * streams were ready */ + apr_time_t last_limit_change;/* last time, worker limit changed */ + apr_interval_time_t limit_change_interval; apr_thread_mutex_t *lock; struct apr_thread_cond_t *added_output; + struct apr_thread_cond_t *task_done; struct apr_thread_cond_t *join_wait; apr_size_t stream_max_mem; - int stream_timeout_secs; + apr_interval_time_t stream_timeout; apr_pool_t *spare_pool; /* spare pool, ready for next io */ + apr_allocator_t *spare_allocator; + struct h2_workers *workers; apr_size_t tx_handles_reserved; apr_size_t tx_chunk_size; h2_mplx_consumed_cb *input_consumed; void *input_consumed_ctx; + + struct h2_req_engine *engine; + /* TODO: signal for waiting tasks*/ + apr_queue_t *engine_queue; + int next_eng_id; }; @@ -95,12 +115,15 @@ struct h2_mplx { * Object lifecycle and information. ******************************************************************************/ +apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s); + /** * Create the multiplexer for the given HTTP2 session. * Implicitly has reference count 1. */ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *master, const struct h2_config *conf, + apr_interval_time_t stream_timeout, struct h2_workers *workers); /** @@ -119,7 +142,9 @@ apr_status_t h2_mplx_release_and_join(h2 */ void h2_mplx_abort(h2_mplx *mplx); -void h2_mplx_request_done(h2_mplx **pm, int stream_id, const struct h2_request **preq); +struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, int *has_more); + +void h2_mplx_task_done(h2_mplx *m, struct h2_task *task, struct h2_task **ptask); /** * Get the highest stream identifier that has been passed on to processing. @@ -143,10 +168,14 @@ int h2_mplx_get_max_stream_started(h2_mp */ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error); -/* Return != 0 iff the multiplexer has data for the given stream. +/* Return != 0 iff the multiplexer has output data for the given stream. */ int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id); +/* Return != 0 iff the multiplexer has input data for the given stream. + */ +int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id); + /** * Waits on output data from any stream in this session to become available. * Returns APR_TIMEUP if no data arrived in the given time. @@ -179,8 +208,6 @@ apr_status_t h2_mplx_process(h2_mplx *m, */ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx); -const struct h2_request *h2_mplx_pop_request(h2_mplx *mplx, int *has_more); - /** * Register a callback for the amount of input data consumed per stream. The * will only ever be invoked from the thread creating this h2_mplx, e.g. when @@ -248,7 +275,7 @@ apr_status_t h2_mplx_in_update_windows(h * @param bb the brigade to place any existing repsonse body data into */ struct h2_stream *h2_mplx_next_submit(h2_mplx *m, - struct h2_stream_set *streams); + struct h2_ihash_t *streams); /** * Reads output data from the given stream. Will never block, but @@ -369,5 +396,32 @@ APR_RING_INSERT_TAIL((b), ap__b, h2_mplx */ #define H2_MPLX_REMOVE(e) APR_RING_REMOVE((e), link) +/******************************************************************************* + * h2_mplx DoS protection + ******************************************************************************/ + +/** + * Master connection has entered idle mode. + * @param m the mplx instance of the master connection + * @return != SUCCESS iff connection should be terminated + */ +apr_status_t h2_mplx_idle(h2_mplx *m); + +/******************************************************************************* + * h2_mplx h2_req_engine handling. + ******************************************************************************/ + +typedef apr_status_t h2_mplx_engine_init(struct h2_req_engine *engine, + request_rec *r); + +apr_status_t h2_mplx_engine_push(const char *engine_type, + request_rec *r, h2_mplx_engine_init *einit); + +apr_status_t h2_mplx_engine_pull(struct h2_req_engine *engine, + apr_read_type_e block, request_rec **pr); + +void h2_mplx_engine_done(struct h2_req_engine *engine, conn_rec *r_conn); + +void h2_mplx_engine_exit(struct h2_req_engine *engine); #endif /* defined(__mod_h2__h2_mplx__) */ Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_private.h URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_private.h?rev=1733259&r1=1733258&r2=1733259&view=diff ============================================================================== --- httpd/httpd/branches/2.4.x/modules/http2/h2_private.h (original) +++ httpd/httpd/branches/2.4.x/modules/http2/h2_private.h Wed Mar 2 11:21:28 2016 @@ -16,26 +16,12 @@ #ifndef mod_h2_h2_private_h #define mod_h2_h2_private_h +#include + #include extern module AP_MODULE_DECLARE_DATA http2_module; APLOG_USE_MODULE(http2); - -#define H2_HEADER_METHOD ":method" -#define H2_HEADER_METHOD_LEN 7 -#define H2_HEADER_SCHEME ":scheme" -#define H2_HEADER_SCHEME_LEN 7 -#define H2_HEADER_AUTH ":authority" -#define H2_HEADER_AUTH_LEN 10 -#define H2_HEADER_PATH ":path" -#define H2_HEADER_PATH_LEN 5 -#define H2_CRLF "\r\n" - -#define H2_ALEN(a) (sizeof(a)/sizeof((a)[0])) - -#define H2MAX(x,y) ((x) > (y) ? (x) : (y)) -#define H2MIN(x,y) ((x) < (y) ? (x) : (y)) - #endif Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_push.c URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_push.c?rev=1733259&r1=1733258&r2=1733259&view=diff ============================================================================== --- httpd/httpd/branches/2.4.x/modules/http2/h2_push.c (original) +++ httpd/httpd/branches/2.4.x/modules/http2/h2_push.c Wed Mar 2 11:21:28 2016 @@ -276,20 +276,49 @@ static int same_authority(const h2_reque return 1; } -static int set_header(void *ctx, const char *key, const char *value) +static int set_push_header(void *ctx, const char *key, const char *value) { - apr_table_setn(ctx, key, value); + size_t klen = strlen(key); + if (H2_HD_MATCH_LIT("User-Agent", key, klen) + || H2_HD_MATCH_LIT("Accept", key, klen) + || H2_HD_MATCH_LIT("Accept-Encoding", key, klen) + || H2_HD_MATCH_LIT("Accept-Language", key, klen) + || H2_HD_MATCH_LIT("Cache-Control", key, klen)) { + apr_table_setn(ctx, key, value); + } return 1; } +static int has_param(link_ctx *ctx, const char *param) +{ + const char *p = apr_table_get(ctx->params, param); + return !!p; +} + +static int has_relation(link_ctx *ctx, const char *rel) +{ + const char *s, *val = apr_table_get(ctx->params, "rel"); + if (val) { + if (!strcmp(rel, val)) { + return 1; + } + s = ap_strstr_c(val, rel); + if (s && (s == val || s[-1] == ' ')) { + s += strlen(rel); + if (!*s || *s == ' ') { + return 1; + } + } + } + return 0; +} static int add_push(link_ctx *ctx) { /* so, we have read a Link header and need to decide * if we transform it into a push. */ - const char *rel = apr_table_get(ctx->params, "rel"); - if (rel && !strcmp("preload", rel)) { + if (has_relation(ctx, "preload") && !has_param(ctx, "nopush")) { apr_uri_t uri; if (apr_uri_parse(ctx->pool, ctx->link, &uri) == APR_SUCCESS) { if (uri.path && same_authority(ctx->req, &uri)) { @@ -306,9 +335,7 @@ static int add_push(link_ctx *ctx) * TLS (if any) parameters. */ path = apr_uri_unparse(ctx->pool, &uri, APR_URI_UNP_OMITSITEPART); - push = apr_pcalloc(ctx->pool, sizeof(*push)); - switch (ctx->req->push_policy) { case H2_PUSH_HEAD: method = "HEAD"; @@ -318,15 +345,10 @@ static int add_push(link_ctx *ctx) break; } headers = apr_table_make(ctx->pool, 5); - apr_table_do(set_header, headers, ctx->req->headers, - "User-Agent", - "Cache-Control", - "Accept-Language", - NULL); - req = h2_request_createn(0, ctx->pool, ctx->req->config, - method, ctx->req->scheme, - ctx->req->authority, - path, headers); + apr_table_do(set_push_header, headers, ctx->req->headers, NULL); + req = h2_request_createn(0, ctx->pool, method, ctx->req->scheme, + ctx->req->authority, path, headers, + ctx->req->serialize); /* atm, we do not push on pushes */ h2_request_end_headers(req, ctx->pool, 1, 0); push->req = req; @@ -434,38 +456,30 @@ apr_array_header_t *h2_push_collect(apr_ return NULL; } -void h2_push_policy_determine(struct h2_request *req, apr_pool_t *p, int push_enabled) -{ - h2_push_policy policy = H2_PUSH_NONE; - if (push_enabled) { - const char *val = apr_table_get(req->headers, "accept-push-policy"); - if (val) { - if (ap_find_token(p, val, "fast-load")) { - policy = H2_PUSH_FAST_LOAD; - } - else if (ap_find_token(p, val, "head")) { - policy = H2_PUSH_HEAD; - } - else if (ap_find_token(p, val, "default")) { - policy = H2_PUSH_DEFAULT; - } - else if (ap_find_token(p, val, "none")) { - policy = H2_PUSH_NONE; - } - else { - /* nothing known found in this header, go by default */ - policy = H2_PUSH_DEFAULT; - } - } - else { - policy = H2_PUSH_DEFAULT; - } - } - req->push_policy = policy; -} - /******************************************************************************* * push diary + * + * - The push diary keeps track of resources already PUSHed via HTTP/2 on this + * connection. It records a hash value from the absolute URL of the resource + * pushed. + * - Lacking openssl, it uses 'apr_hashfunc_default' for the value + * - with openssl, it uses SHA256 to calculate the hash value + * - whatever the method to generate the hash, the diary keeps a maximum of 64 + * bits per hash, limiting the memory consumption to about + * H2PushDiarySize * 8 + * bytes. Entries are sorted by most recently used and oldest entries are + * forgotten first. + * - Clients can initialize/replace the push diary by sending a 'Cache-Digest' + * header. Currently, this is the base64url encoded value of the cache digest + * as specified in https://datatracker.ietf.org/doc/draft-kazuho-h2-cache-digest/ + * This draft can be expected to evolve and the definition of the header + * will be added there and refined. + * - The cache digest header is a Golomb Coded Set of hash values, but it may + * limit the amount of bits per hash value even further. For a good description + * of GCS, read here: + * http://giovanni.bajo.it/post/47119962313/golomb-coded-sets-smaller-than-bloom-filters + * - The means that the push diary might be initialized with hash values of much + * less than 64 bits, leading to more false positives, but smaller digest size. ******************************************************************************/ @@ -688,36 +702,6 @@ apr_array_header_t *h2_push_collect_upda return h2_push_diary_update(stream->session, pushes); } -/* h2_log2(n) iff n is a power of 2 */ -static unsigned char h2_log2(apr_uint32_t n) -{ - int lz = 0; - if (!n) { - return 0; - } - if (!(n & 0xffff0000u)) { - lz += 16; - n = (n << 16); - } - if (!(n & 0xff000000u)) { - lz += 8; - n = (n << 8); - } - if (!(n & 0xf0000000u)) { - lz += 4; - n = (n << 4); - } - if (!(n & 0xc0000000u)) { - lz += 2; - n = (n << 2); - } - if (!(n & 0x80000000u)) { - lz += 1; - } - - return 31 - lz; -} - static apr_int32_t h2_log2inv(unsigned char log2) { return log2? (1 << log2) : 1; @@ -794,8 +778,8 @@ static apr_status_t gset_encode_next(gse /* Intentional no APLOGNO */ ap_log_perror(APLOG_MARK, GCSLOG_LEVEL, 0, encoder->pool, "h2_push_diary_enc: val=%"APR_UINT64_T_HEX_FMT", delta=%" - APR_UINT64_T_HEX_FMT" flex_bits=%ld, " - "fixed_bits=%d, fixed_val=%"APR_UINT64_T_HEX_FMT, + APR_UINT64_T_HEX_FMT" flex_bits=%"APR_UINT64_T_FMT", " + ", fixed_bits=%d, fixed_val=%"APR_UINT64_T_HEX_FMT, pval, delta, flex_bits, encoder->fixed_bits, delta&encoder->fixed_mask); for (; flex_bits != 0; --flex_bits) { status = gset_encode_bit(encoder, 1); Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_push.h URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_push.h?rev=1733259&r1=1733258&r2=1733259&view=diff ============================================================================== --- httpd/httpd/branches/2.4.x/modules/http2/h2_push.h (original) +++ httpd/httpd/branches/2.4.x/modules/http2/h2_push.h Wed Mar 2 11:21:28 2016 @@ -15,19 +15,14 @@ #ifndef __mod_h2__h2_push__ #define __mod_h2__h2_push__ +#include "h2.h" + struct h2_request; struct h2_response; struct h2_ngheader; struct h2_session; struct h2_stream; -typedef enum { - H2_PUSH_NONE, - H2_PUSH_DEFAULT, - H2_PUSH_HEAD, - H2_PUSH_FAST_LOAD, -} h2_push_policy; - typedef struct h2_push { const struct h2_request *req; } h2_push; @@ -66,17 +61,6 @@ apr_array_header_t *h2_push_collect(apr_ const struct h2_response *res); /** - * Set the push policy for the given request. Takes request headers into - * account, see draft https://tools.ietf.org/html/draft-ruellan-http-accept-push-policy-00 - * for details. - * - * @param req the request to determine the policy for - * @param p the pool to use - * @param push_enabled if HTTP/2 server push is generally enabled for this request - */ -void h2_push_policy_determine(struct h2_request *req, apr_pool_t *p, int push_enabled); - -/** * Create a new push diary for the given maximum number of entries. * * @oaram p the pool to use Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_request.c URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_request.c?rev=1733259&r1=1733258&r2=1733259&view=diff ============================================================================== --- httpd/httpd/branches/2.4.x/modules/http2/h2_request.c (original) +++ httpd/httpd/branches/2.4.x/modules/http2/h2_request.c Wed Mar 2 11:21:28 2016 @@ -30,45 +30,36 @@ #include #include "h2_private.h" -#include "h2_config.h" -#include "h2_mplx.h" #include "h2_push.h" #include "h2_request.h" -#include "h2_task.h" #include "h2_util.h" -h2_request *h2_request_create(int id, apr_pool_t *pool, - const struct h2_config *config) +h2_request *h2_request_create(int id, apr_pool_t *pool, int serialize) { - return h2_request_createn(id, pool, config, - NULL, NULL, NULL, NULL, NULL); + return h2_request_createn(id, pool, NULL, NULL, NULL, NULL, NULL, + serialize); } h2_request *h2_request_createn(int id, apr_pool_t *pool, - const struct h2_config *config, const char *method, const char *scheme, const char *authority, const char *path, - apr_table_t *header) + apr_table_t *header, int serialize) { h2_request *req = apr_pcalloc(pool, sizeof(h2_request)); req->id = id; - req->config = config; req->method = method; req->scheme = scheme; req->authority = authority; req->path = path; req->headers = header? header : apr_table_make(pool, 10); req->request_time = apr_time_now(); - + req->serialize = serialize; + return req; } -void h2_request_destroy(h2_request *req) -{ -} - static apr_status_t inspect_clen(h2_request *req, const char *s) { char *end; @@ -139,38 +130,48 @@ static apr_status_t add_all_h1_header(h2 } +apr_status_t h2_request_make(h2_request *req, apr_pool_t *pool, + const char *method, const char *scheme, + const char *authority, const char *path, + apr_table_t *headers) +{ + req->method = method; + req->scheme = scheme; + req->authority = authority; + req->path = path; + + AP_DEBUG_ASSERT(req->scheme); + AP_DEBUG_ASSERT(req->authority); + AP_DEBUG_ASSERT(req->path); + AP_DEBUG_ASSERT(req->method); + + return add_all_h1_header(req, pool, headers); +} + apr_status_t h2_request_rwrite(h2_request *req, request_rec *r) { apr_status_t status; + const char *scheme, *authority; - req->config = h2_config_rget(r); - req->method = r->method; - req->scheme = (r->parsed_uri.scheme? r->parsed_uri.scheme - : ap_http_scheme(r)); - req->authority = r->hostname; - req->path = apr_uri_unparse(r->pool, &r->parsed_uri, - APR_URI_UNP_OMITSITEPART); - - if (!ap_strchr_c(req->authority, ':') && r->server && r->server->port) { - apr_port_t defport = apr_uri_port_of_scheme(req->scheme); + scheme = (r->parsed_uri.scheme? r->parsed_uri.scheme + : ap_http_scheme(r)); + authority = r->hostname; + if (!ap_strchr_c(authority, ':') && r->server && r->server->port) { + apr_port_t defport = apr_uri_port_of_scheme(scheme); if (defport != r->server->port) { /* port info missing and port is not default for scheme: append */ - req->authority = apr_psprintf(r->pool, "%s:%d", req->authority, - (int)r->server->port); + authority = apr_psprintf(r->pool, "%s:%d", authority, + (int)r->server->port); } } - AP_DEBUG_ASSERT(req->scheme); - AP_DEBUG_ASSERT(req->authority); - AP_DEBUG_ASSERT(req->path); - AP_DEBUG_ASSERT(req->method); - - status = add_all_h1_header(req, r->pool, r->headers_in); - + status = h2_request_make(req, r->pool, r->method, scheme, authority, + apr_uri_unparse(r->pool, &r->parsed_uri, + APR_URI_UNP_OMITSITEPART), + r->headers_in); ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO(03058) "h2_request(%d): rwrite %s host=%s://%s%s", req->id, req->method, req->scheme, req->authority, req->path); - return status; } @@ -337,11 +338,22 @@ void h2_request_copy(apr_pool_t *p, h2_r dst->authority = OPT_COPY(p, src->authority); dst->path = OPT_COPY(p, src->path); dst->headers = apr_table_clone(p, src->headers); + if (src->trailers) { + dst->trailers = apr_table_clone(p, src->trailers); + } dst->content_length = src->content_length; dst->chunked = src->chunked; dst->eoh = src->eoh; } +h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src) +{ + h2_request *nreq = apr_pcalloc(p, sizeof(*nreq)); + memcpy(nreq, src, sizeof(*nreq)); + h2_request_copy(p, nreq, src); + return nreq; +} + request_rec *h2_request_create_rec(const h2_request *req, conn_rec *conn) { request_rec *r; @@ -361,7 +373,7 @@ request_rec *h2_request_create_rec(const r->allowed_methods = ap_make_method_list(p, 2); - r->headers_in = apr_table_copy(r->pool, req->headers); + r->headers_in = apr_table_clone(r->pool, req->headers); r->trailers_in = apr_table_make(r->pool, 5); r->subprocess_env = apr_table_make(r->pool, 25); r->headers_out = apr_table_make(r->pool, 12); @@ -408,7 +420,7 @@ request_rec *h2_request_create_rec(const } ap_parse_uri(r, req->path); - r->protocol = (char*)"HTTP/2"; + r->protocol = "HTTP/2"; r->proto_num = HTTP_VERSION(2, 0); r->the_request = apr_psprintf(r->pool, "%s %s %s", Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_request.h URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_request.h?rev=1733259&r1=1733258&r2=1733259&view=diff ============================================================================== --- httpd/httpd/branches/2.4.x/modules/http2/h2_request.h (original) +++ httpd/httpd/branches/2.4.x/modules/http2/h2_request.h Wed Mar 2 11:21:28 2016 @@ -16,48 +16,19 @@ #ifndef __mod_h2__h2_request__ #define __mod_h2__h2_request__ -/* h2_request is the transformer of HTTP2 streams into HTTP/1.1 internal - * format that will be fed to various httpd input filters to finally - * become a request_rec to be handled by soemone. - */ -struct h2_config; -struct h2_to_h1; -struct h2_mplx; -struct h2_task; - -typedef struct h2_request h2_request; - -struct h2_request { - int id; /* stream id */ - - const char *method; /* pseudo header values, see ch. 8.1.2.3 */ - const char *scheme; - const char *authority; - const char *path; - - apr_table_t *headers; - apr_table_t *trailers; - - apr_time_t request_time; - apr_off_t content_length; - - unsigned int chunked : 1; /* iff requst body needs to be forwarded as chunked */ - unsigned int eoh : 1; /* iff end-of-headers has been seen and request is complete */ - unsigned int body : 1; /* iff this request has a body */ - unsigned int push_policy; /* which push policy to use for this request */ - const struct h2_config *config; -}; +#include "h2.h" -h2_request *h2_request_create(int id, apr_pool_t *pool, - const struct h2_config *config); +h2_request *h2_request_create(int id, apr_pool_t *pool, int serialize); h2_request *h2_request_createn(int id, apr_pool_t *pool, - const struct h2_config *config, const char *method, const char *scheme, const char *authority, const char *path, - apr_table_t *headers); + apr_table_t *headers, int serialize); -void h2_request_destroy(h2_request *req); +apr_status_t h2_request_make(h2_request *req, apr_pool_t *pool, + const char *method, const char *scheme, + const char *authority, const char *path, + apr_table_t *headers); apr_status_t h2_request_rwrite(h2_request *req, request_rec *r); @@ -74,6 +45,8 @@ apr_status_t h2_request_end_headers(h2_r void h2_request_copy(apr_pool_t *p, h2_request *dst, const h2_request *src); +h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src); + /** * Create a request_rec representing the h2_request to be * processed on the given connection.