Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 48F70200B98 for ; Mon, 3 Oct 2016 13:48:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 47705160AE5; Mon, 3 Oct 2016 11:48:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BDC2E160ACC for ; Mon, 3 Oct 2016 13:48:03 +0200 (CEST) Received: (qmail 78775 invoked by uid 500); 3 Oct 2016 11:47:57 -0000 Mailing-List: contact cvs-help@httpd.apache.org; run by ezmlm Precedence: bulk Reply-To: dev@httpd.apache.org list-help: list-unsubscribe: List-Post: List-Id: Delivered-To: mailing list cvs@httpd.apache.org Received: (qmail 78766 invoked by uid 99); 3 Oct 2016 11:47:57 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Oct 2016 11:47:57 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 49F7E1A5BDE for ; Mon, 3 Oct 2016 11:47:57 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -1.199 X-Spam-Level: X-Spam-Status: No, score=-1.199 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id aEKLtn_Md4g7 for ; Mon, 3 Oct 2016 11:47:49 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with ESMTP id DE6775FB16 for ; Mon, 3 Oct 2016 11:47:48 +0000 (UTC) Received: from svn01-us-west.apache.org (svn.apache.org [10.41.0.6]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id C387BE0FF0 for ; Mon, 3 Oct 2016 11:47:47 +0000 (UTC) Received: from svn01-us-west.apache.org (localhost [127.0.0.1]) by svn01-us-west.apache.org (ASF Mail Server at svn01-us-west.apache.org) with ESMTP id 17EF33A10C4 for ; Mon, 3 Oct 2016 11:47:47 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: svn commit: r1763158 [2/3] - in /httpd/httpd/trunk: ./ modules/http2/ Date: Mon, 03 Oct 2016 11:47:45 -0000 To: cvs@httpd.apache.org From: icing@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20161003114747.17EF33A10C4@svn01-us-west.apache.org> archived-at: Mon, 03 Oct 2016 11:48:06 -0000 Modified: httpd/httpd/trunk/modules/http2/h2_mplx.c URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.c?rev=1763158&r1=1763157&r2=1763158&view=diff ============================================================================== --- httpd/httpd/trunk/modules/http2/h2_mplx.c (original) +++ httpd/httpd/trunk/modules/http2/h2_mplx.c Mon Oct 3 11:47:45 2016 @@ -28,13 +28,13 @@ #include "mod_http2.h" +#include "h2.h" #include "h2_private.h" #include "h2_bucket_beam.h" #include "h2_config.h" #include "h2_conn.h" #include "h2_ctx.h" #include "h2_h2.h" -#include "h2_response.h" #include "h2_mplx.h" #include "h2_ngn_shed.h" #include "h2_request.h" @@ -297,7 +297,6 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id)); m->q = h2_iq_create(m->pool, m->max_streams); m->sready = h2_ihash_create(m->pool, offsetof(h2_stream,id)); - m->sresume = h2_ihash_create(m->pool, offsetof(h2_stream,id)); m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id)); m->stream_timeout = stream_timeout; @@ -444,7 +443,6 @@ static void stream_done(h2_mplx *m, h2_s */ h2_iq_remove(m->q, stream->id); h2_ihash_remove(m->sready, stream->id); - h2_ihash_remove(m->sresume, stream->id); h2_ihash_remove(m->streams, stream->id); if (stream->input) { m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input); @@ -516,12 +514,10 @@ static int task_print(void *ctx, void *v h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ - "->03198: h2_stream(%s): %s %s %s -> %s %d" + "->03198: h2_stream(%s): %s %s %s" "[orph=%d/started=%d/done=%d/frozen=%d]", task->id, task->request->method, task->request->authority, task->request->path, - task->response? "http" : (task->rst_error? "reset" : "?"), - task->response? task->response->http_status : task->rst_error, (stream? 0 : 1), task->worker_started, task->worker_done, task->frozen); } @@ -545,7 +541,7 @@ static int task_abort_connection(void *c if (task->input.beam) { h2_beam_abort(task->input.beam); } - if (task->worker_started && !task->worker_done && task->output.beam) { + if (task->output.beam) { h2_beam_abort(task->output.beam); } return 1; @@ -556,9 +552,9 @@ static int report_stream_iter(void *ctx, h2_stream *stream = val; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, " - "submitted=%d, suspended=%d", + "ready=%d", m->id, stream->id, stream->started, stream->scheduled, - stream->submitted, stream->suspended); + h2_stream_is_ready(stream)); return 1; } @@ -575,9 +571,8 @@ apr_status_t h2_mplx_release_and_join(h2 if (!h2_ihash_empty(m->streams) && APLOGctrace1(m->c)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): release_join with %d streams open, " - "%d streams resume, %d streams ready, %d tasks", + "%d streams ready, %d tasks", m->id, (int)h2_ihash_count(m->streams), - (int)h2_ihash_count(m->sresume), (int)h2_ihash_count(m->sready), (int)h2_ihash_count(m->tasks)); h2_ihash_iter(m->streams, report_stream_iter, m); @@ -707,6 +702,19 @@ apr_status_t h2_mplx_stream_done(h2_mplx return status; } +h2_stream *h2_mplx_stream_get(h2_mplx *m, apr_uint32_t id) +{ + h2_stream *s = NULL; + int acquired; + + AP_DEBUG_ASSERT(m); + if ((enter_mutex(m, &acquired)) == APR_SUCCESS) { + s = h2_ihash_get(m->streams, id); + leave_mutex(m, acquired); + } + return s; +} + void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx) { m->input_consumed = cb; @@ -730,31 +738,26 @@ static void output_produced(void *ctx, h } } -static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response) +static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) { apr_status_t status = APR_SUCCESS; h2_task *task = h2_ihash_get(m->tasks, stream_id); h2_stream *stream = h2_ihash_get(m->streams, stream_id); + apr_size_t beamed_count; if (!task || !stream) { return APR_ECONNABORTED; } - status = h2_task_add_response(task, response); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, - "h2_mplx(%s): add response: %d, rst=%d", - task->id, response->http_status, response->rst_error); - if (status != APR_SUCCESS) { - return status; - } - - if (task->output.beam && !task->output.opened) { - apr_uint32_t beamed_count; - h2_beam_buffer_size_set(task->output.beam, m->stream_max_mem); - h2_beam_timeout_set(task->output.beam, m->stream_timeout); - h2_beam_on_consumed(task->output.beam, stream_output_consumed, task); - h2_beam_on_produced(task->output.beam, output_produced, m); - beamed_count = h2_beam_get_files_beamed(task->output.beam); + "h2_mplx(%s): out open", task->id); + + if (!stream->output) { + h2_beam_buffer_size_set(beam, m->stream_max_mem); + h2_beam_timeout_set(beam, m->stream_timeout); + h2_beam_on_consumed(beam, stream_output_consumed, task); + h2_beam_on_produced(beam, output_produced, m); + beamed_count = h2_beam_get_files_beamed(beam); if (m->tx_handles_reserved >= beamed_count) { m->tx_handles_reserved -= beamed_count; } @@ -762,22 +765,20 @@ static apr_status_t out_open(h2_mplx *m, m->tx_handles_reserved = 0; } if (!task->output.copy_files) { - h2_beam_on_file_beam(task->output.beam, can_beam_file, m); + h2_beam_on_file_beam(beam, can_beam_file, m); } - h2_beam_mutex_set(task->output.beam, beam_enter, task->cond, m); - task->output.opened = 1; + h2_beam_mutex_set(beam, beam_enter, task->cond, m); + stream->output = beam; } - if (response && response->http_status < 300) { - /* we might see some file buckets in the output, see - * if we have enough handles reserved. */ - check_tx_reservation(m); - } - have_out_data_for(m, stream, 1); + /* we might see some file buckets in the output, see + * if we have enough handles reserved. */ + check_tx_reservation(m); + have_out_data_for(m, stream, 0); return status; } -apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response) +apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) { apr_status_t status; int acquired; @@ -788,7 +789,7 @@ apr_status_t h2_mplx_out_open(h2_mplx *m status = APR_ECONNABORTED; } else { - status = out_open(m, stream_id, response); + status = out_open(m, stream_id, beam); } leave_mutex(m, acquired); } @@ -809,16 +810,6 @@ static apr_status_t out_close(h2_mplx *m return APR_ECONNABORTED; } - if (!task->response && !task->rst_error) { - /* In case a close comes before a response was created, - * insert an error one so that our streams can properly reset. - */ - h2_response *r = h2_response_die(task->stream_id, 500, - task->request, m->pool); - status = out_open(m, task->stream_id, r); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c, APLOGNO(03393) - "h2_mplx(%s): close, no response, no rst", task->id); - } ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, "h2_mplx(%s): close", task->id); if (task->output.beam) { @@ -842,7 +833,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx if (m->aborted) { status = APR_ECONNABORTED; } - else if (!h2_ihash_empty(m->sready) || !h2_ihash_empty(m->sresume)) { + else if (!h2_ihash_empty(m->sready)) { status = APR_SUCCESS; } else { @@ -863,13 +854,10 @@ apr_status_t h2_mplx_out_trywait(h2_mplx static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response) { - h2_ihash_t *set; ap_assert(m); ap_assert(stream); - - set = response? m->sready : m->sresume; - if (!h2_ihash_get(set, stream->id)) { - h2_ihash_add(set, stream); + if (!h2_ihash_get(m->sready, stream->id)) { + h2_ihash_add(m->sready, stream); if (m->added_output) { apr_thread_cond_signal(m->added_output); } @@ -910,25 +898,20 @@ apr_status_t h2_mplx_process(h2_mplx *m, } else { h2_ihash_add(m->streams, stream); - if (stream->response) { - /* already have a respone, schedule for submit */ + if (h2_stream_is_ready(stream)) { h2_ihash_add(m->sready, stream); } else { - h2_beam_create(&stream->input, stream->pool, stream->id, - "input", 0); if (!m->need_registration) { m->need_registration = h2_iq_empty(m->q); } if (m->workers_busy < m->workers_max) { do_registration = m->need_registration; } - h2_iq_add(m->q, stream->id, cmp, ctx); - - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, - "h2_mplx(%ld-%d): process, body=%d", - m->c->id, stream->id, stream->request->body); + h2_iq_add(m->q, stream->id, cmp, ctx); } + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, + "h2_mplx(%ld-%d): process", m->c->id, stream->id); } leave_mutex(m, acquired); } @@ -939,7 +922,7 @@ apr_status_t h2_mplx_process(h2_mplx *m, return status; } -static h2_task *pop_task(h2_mplx *m) +static h2_task *next_stream_task(h2_mplx *m) { h2_task *task = NULL; h2_stream *stream; @@ -957,13 +940,13 @@ static h2_task *pop_task(h2_mplx *m) slave = *pslave; } else { - slave = h2_slave_create(m->c, m->pool, NULL); + slave = h2_slave_create(m->c, stream->id, m->pool, NULL); new_conn = 1; } slave->sbh = m->c->sbh; slave->aborted = 0; - task = h2_task_create(slave, stream->request, stream->input, m); + task = h2_task_create(slave, stream->id, stream->request, stream->input, m); h2_ihash_add(m->tasks, task); m->c->keepalives++; @@ -1003,7 +986,7 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, in *has_more = 0; } else { - task = pop_task(m); + task = next_stream_task(m); *has_more = !h2_iq_empty(m->q); } @@ -1022,9 +1005,6 @@ static void task_done(h2_mplx *m, h2_tas * and the original worker has finished. That means the * engine may start processing now. */ h2_task_thaw(task); - /* we do not want the task to block on writing response - * bodies into the mplx. */ - h2_task_set_io_blocking(task, 0); apr_thread_cond_broadcast(m->task_thawed); return; } @@ -1141,7 +1121,7 @@ void h2_mplx_task_done(h2_mplx *m, h2_ta --m->workers_busy; if (ptask) { /* caller wants another task */ - *ptask = pop_task(m); + *ptask = next_stream_task(m); } leave_mutex(m, acquired); } @@ -1154,15 +1134,19 @@ void h2_mplx_task_done(h2_mplx *m, h2_ta static int latest_repeatable_unsubmitted_iter(void *data, void *val) { task_iter_ctx *ctx = data; + h2_stream *stream; h2_task *task = val; if (!task->worker_done && h2_task_can_redo(task) && !h2_ihash_get(ctx->m->redo_tasks, task->stream_id)) { - /* this task occupies a worker, the response has not been submitted yet, - * not been cancelled and it is a repeatable request - * -> it can be re-scheduled later */ - if (!ctx->task || ctx->task->started_at < task->started_at) { - /* we did not have one or this one was started later */ - ctx->task = task; + stream = h2_ihash_get(ctx->m->streams, task->stream_id); + if (stream && !h2_stream_is_ready(stream)) { + /* this task occupies a worker, the response has not been submitted + * yet, not been cancelled and it is a repeatable request + * -> it can be re-scheduled later */ + if (!ctx->task || ctx->task->started_at < task->started_at) { + /* we did not have one or this one was started later */ + ctx->task = task; + } } } return 1; @@ -1329,13 +1313,12 @@ apr_status_t h2_mplx_req_engine_push(con return APR_ECONNABORTED; } m = task->mplx; - task->r = r; if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); if (stream) { - status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit); + status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit); } else { status = APR_ECONNABORTED; @@ -1353,7 +1336,6 @@ apr_status_t h2_mplx_req_engine_pull(h2_ h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn); h2_mplx *m = h2_ngn_shed_get_ctx(shed); apr_status_t status; - h2_task *task = NULL; int acquired; if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { @@ -1368,22 +1350,21 @@ apr_status_t h2_mplx_req_engine_pull(h2_ * had and, if not, wait a short while before doing the * blocking, and if unsuccessful, terminating read. */ - status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task); + status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr); if (APR_STATUS_IS_EAGAIN(status)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): start block engine pull", m->id); apr_thread_cond_timedwait(m->task_thawed, m->lock, apr_time_from_msec(20)); - status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task); + status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr); } } else { - status = h2_ngn_shed_pull_task(shed, ngn, capacity, - want_shutdown, &task); + status = h2_ngn_shed_pull_request(shed, ngn, capacity, + want_shutdown, pr); } leave_mutex(m, acquired); } - *pr = task? task->r : NULL; return status; } @@ -1423,14 +1404,12 @@ static int update_window(void *ctx, void apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, stream_ev_callback *on_resume, - stream_ev_callback *on_response, void *on_ctx) { apr_status_t status; int acquired; int streams[32]; h2_stream *stream; - h2_task *task; size_t i, n; AP_DEBUG_ASSERT(m); @@ -1440,8 +1419,7 @@ apr_status_t h2_mplx_dispatch_master_eve /* update input windows for streams */ h2_ihash_iter(m->streams, update_window, m); - - if (on_response && !h2_ihash_empty(m->sready)) { + if (on_resume && !h2_ihash_empty(m->sready)) { n = h2_ihash_ishift(m->sready, streams, H2_ALEN(streams)); for (i = 0; i < n; ++i) { stream = h2_ihash_get(m->streams, streams[i]); @@ -1449,49 +1427,9 @@ apr_status_t h2_mplx_dispatch_master_eve continue; } ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, - "h2_mplx(%ld-%d): on_response", - m->id, stream->id); - task = h2_ihash_get(m->tasks, stream->id); - if (task) { - task->response_sent = 1; - if (task->rst_error) { - h2_stream_rst(stream, task->rst_error); - } - else { - AP_DEBUG_ASSERT(task->response); - status = h2_stream_add_response(stream, task->response, - task->output.beam); - if (status != APR_SUCCESS) { - h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR); - } - if (!h2_response_get_final(task->response)) { - /* the final response needs still to arrive */ - task->response = NULL; - } - } - } - else { - /* We have the stream ready without a task. This happens - * when we fail streams early. A response should already - * be present. */ - AP_DEBUG_ASSERT(stream->response || stream->rst_error); - } - status = on_response(on_ctx, stream->id); - } - } - - if (on_resume && !h2_ihash_empty(m->sresume)) { - n = h2_ihash_ishift(m->sresume, streams, H2_ALEN(streams)); - for (i = 0; i < n; ++i) { - stream = h2_ihash_get(m->streams, streams[i]); - if (!stream) { - continue; - } - ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, "h2_mplx(%ld-%d): on_resume", m->id, stream->id); - h2_stream_set_suspended(stream, 0); - status = on_resume(on_ctx, stream->id); + on_resume(on_ctx, stream->id); } } @@ -1500,25 +1438,36 @@ apr_status_t h2_mplx_dispatch_master_eve return status; } -apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id) +apr_status_t h2_mplx_keep_active(h2_mplx *m, apr_uint32_t stream_id) { apr_status_t status; - h2_stream *stream; - h2_task *task; int acquired; AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - stream = h2_ihash_get(m->streams, stream_id); - if (stream && !h2_ihash_get(m->sresume, stream->id)) { - /* not marked for resume again already */ - h2_stream_set_suspended(stream, 1); - task = h2_ihash_get(m->tasks, stream->id); - if (stream->started && (!task || task->worker_done)) { - h2_ihash_add(m->sresume, stream); - } + h2_stream *s = h2_ihash_get(m->streams, stream_id); + if (s) { + h2_ihash_add(m->sready, s); } leave_mutex(m, acquired); } return status; } + +int h2_mplx_awaits_data(h2_mplx *m) +{ + apr_status_t status; + int acquired, waiting = 1; + + AP_DEBUG_ASSERT(m); + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { + if (h2_ihash_empty(m->streams)) { + waiting = 0; + } + if (h2_iq_empty(m->q) && h2_ihash_empty(m->tasks)) { + waiting = 0; + } + leave_mutex(m, acquired); + } + return waiting; +} Modified: httpd/httpd/trunk/modules/http2/h2_mplx.h URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.h?rev=1763158&r1=1763157&r2=1763158&view=diff ============================================================================== --- httpd/httpd/trunk/modules/http2/h2_mplx.h (original) +++ httpd/httpd/trunk/modules/http2/h2_mplx.h Mon Oct 3 11:47:45 2016 @@ -40,7 +40,6 @@ struct apr_thread_cond_t; struct h2_bucket_beam; struct h2_config; struct h2_ihash_t; -struct h2_response; struct h2_task; struct h2_stream; struct h2_request; @@ -76,9 +75,8 @@ struct h2_mplx { struct h2_ihash_t *spurge; /* all streams done, ready for destroy */ struct h2_iqueue *q; /* all stream ids that need to be started */ - struct h2_ihash_t *sready; /* all streams ready for response */ - struct h2_ihash_t *sresume; /* all streams that can be resumed */ - + struct h2_ihash_t *sready; /* all streams ready for output */ + struct h2_ihash_t *tasks; /* all tasks started and not destroyed */ struct h2_ihash_t *redo_tasks; /* all tasks that need to be redone */ @@ -164,6 +162,8 @@ int h2_mplx_is_busy(h2_mplx *m); * IO lifetime of streams. ******************************************************************************/ +struct h2_stream *h2_mplx_stream_get(h2_mplx *m, apr_uint32_t id); + /** * Notifies mplx that a stream has finished processing. * @@ -181,6 +181,8 @@ apr_status_t h2_mplx_stream_done(h2_mplx apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, struct apr_thread_cond_t *iowait); +apr_status_t h2_mplx_keep_active(h2_mplx *m, apr_uint32_t stream_id); + /******************************************************************************* * Stream processing. ******************************************************************************/ @@ -222,16 +224,15 @@ typedef apr_status_t stream_ev_callback( /** * Dispatch events for the master connection, such as - * - resume: new output data has arrived for a suspended stream - * - response: the response for a stream is ready + ± @param m the multiplexer + * @param on_resume new output data has arrived for a suspended stream + * @param ctx user supplied argument to invocation. */ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, stream_ev_callback *on_resume, - stream_ev_callback *on_response, void *ctx); -apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id); - +int h2_mplx_awaits_data(h2_mplx *m); typedef int h2_mplx_stream_cb(struct h2_stream *s, void *ctx); @@ -245,7 +246,7 @@ apr_status_t h2_mplx_stream_do(h2_mplx * * Opens the output for the given stream with the specified response. */ apr_status_t h2_mplx_out_open(h2_mplx *mplx, int stream_id, - struct h2_response *response); + struct h2_bucket_beam *beam); /******************************************************************************* * h2_mplx list Manipulation. Modified: httpd/httpd/trunk/modules/http2/h2_ngn_shed.c URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_ngn_shed.c?rev=1763158&r1=1763157&r2=1763158&view=diff ============================================================================== --- httpd/httpd/trunk/modules/http2/h2_ngn_shed.c (original) +++ httpd/httpd/trunk/modules/http2/h2_ngn_shed.c Mon Oct 3 11:47:45 2016 @@ -35,7 +35,6 @@ #include "h2_ctx.h" #include "h2_h2.h" #include "h2_mplx.h" -#include "h2_response.h" #include "h2_request.h" #include "h2_task.h" #include "h2_util.h" @@ -46,6 +45,7 @@ typedef struct h2_ngn_entry h2_ngn_entry struct h2_ngn_entry { APR_RING_ENTRY(h2_ngn_entry) link; h2_task *task; + request_rec *r; }; #define H2_NGN_ENTRY_NEXT(e) APR_RING_NEXT((e), link) @@ -144,26 +144,28 @@ void h2_ngn_shed_abort(h2_ngn_shed *shed shed->aborted = 1; } -static void ngn_add_task(h2_req_engine *ngn, h2_task *task) +static void ngn_add_task(h2_req_engine *ngn, h2_task *task, request_rec *r) { h2_ngn_entry *entry = apr_pcalloc(task->pool, sizeof(*entry)); APR_RING_ELEM_INIT(entry, link); entry->task = task; + entry->r = r; H2_REQ_ENTRIES_INSERT_TAIL(&ngn->entries, entry); } -apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type, - h2_task *task, http2_req_engine_init *einit) +apr_status_t h2_ngn_shed_push_request(h2_ngn_shed *shed, const char *ngn_type, + request_rec *r, + http2_req_engine_init *einit) { h2_req_engine *ngn; + h2_task *task = h2_ctx_rget_task(r); - AP_DEBUG_ASSERT(shed); - + ap_assert(task); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c, "h2_ngn_shed(%ld): PUSHing request (task=%s)", shed->c->id, task->id); - if (task->ser_headers) { + if (task->request->serialize) { /* Max compatibility, deny processing of this */ return APR_EOF; } @@ -184,7 +186,7 @@ apr_status_t h2_ngn_shed_push_task(h2_ng if (!h2_task_is_detached(task)) { h2_task_freeze(task); } - ngn_add_task(ngn, task); + ngn_add_task(ngn, task, r); ngn->no_assigned++; return APR_SUCCESS; } @@ -207,7 +209,7 @@ apr_status_t h2_ngn_shed_push_task(h2_ng APR_RING_INIT(&newngn->entries, h2_ngn_entry, link); status = einit(newngn, newngn->id, newngn->type, newngn->pool, - shed->req_buffer_size, task->r, + shed->req_buffer_size, r, &newngn->out_consumed, &newngn->out_consumed_ctx); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, APLOGNO(03395) "h2_ngn_shed(%ld): create engine %s (%s)", @@ -242,16 +244,16 @@ static h2_ngn_entry *pop_detached(h2_req return NULL; } -apr_status_t h2_ngn_shed_pull_task(h2_ngn_shed *shed, - h2_req_engine *ngn, - apr_uint32_t capacity, - int want_shutdown, - h2_task **ptask) +apr_status_t h2_ngn_shed_pull_request(h2_ngn_shed *shed, + h2_req_engine *ngn, + apr_uint32_t capacity, + int want_shutdown, + request_rec **pr) { h2_ngn_entry *entry; AP_DEBUG_ASSERT(ngn); - *ptask = NULL; + *pr = NULL; ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c, APLOGNO(03396) "h2_ngn_shed(%ld): pull task for engine %s, shutdown=%d", shed->c->id, ngn->id, want_shutdown); @@ -279,7 +281,7 @@ apr_status_t h2_ngn_shed_pull_task(h2_ng "h2_ngn_shed(%ld): pulled request %s for engine %s", shed->c->id, entry->task->id, ngn->id); ngn->no_live++; - *ptask = entry->task; + *pr = entry->r; entry->task->assigned = ngn; /* task will now run in ngn's own thread. Modules like lua * seem to require the correct thread set in the conn_rec. Modified: httpd/httpd/trunk/modules/http2/h2_ngn_shed.h URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_ngn_shed.h?rev=1763158&r1=1763157&r2=1763158&view=diff ============================================================================== --- httpd/httpd/trunk/modules/http2/h2_ngn_shed.h (original) +++ httpd/httpd/trunk/modules/http2/h2_ngn_shed.h Mon Oct 3 11:47:45 2016 @@ -58,13 +58,13 @@ h2_ngn_shed *h2_ngn_shed_get_shed(struct void h2_ngn_shed_abort(h2_ngn_shed *shed); -apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type, - struct h2_task *task, - h2_shed_ngn_init *init_cb); +apr_status_t h2_ngn_shed_push_request(h2_ngn_shed *shed, const char *ngn_type, + request_rec *r, + h2_shed_ngn_init *init_cb); -apr_status_t h2_ngn_shed_pull_task(h2_ngn_shed *shed, h2_req_engine *pub_ngn, - apr_uint32_t capacity, - int want_shutdown, struct h2_task **ptask); +apr_status_t h2_ngn_shed_pull_request(h2_ngn_shed *shed, h2_req_engine *pub_ngn, + apr_uint32_t capacity, + int want_shutdown, request_rec **pr); apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed, struct h2_req_engine *ngn, Modified: httpd/httpd/trunk/modules/http2/h2_proxy_util.c URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_proxy_util.c?rev=1763158&r1=1763157&r2=1763158&view=diff ============================================================================== --- httpd/httpd/trunk/modules/http2/h2_proxy_util.c (original) +++ httpd/httpd/trunk/modules/http2/h2_proxy_util.c Mon Oct 3 11:47:45 2016 @@ -566,7 +566,6 @@ static h2_request *h2_req_createn(int id { h2_request *req = apr_pcalloc(pool, sizeof(h2_request)); - req->id = id; req->method = method; req->scheme = scheme; req->authority = authority; Modified: httpd/httpd/trunk/modules/http2/h2_push.c URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_push.c?rev=1763158&r1=1763157&r2=1763158&view=diff ============================================================================== --- httpd/httpd/trunk/modules/http2/h2_push.c (original) +++ httpd/httpd/trunk/modules/http2/h2_push.c Mon Oct 3 11:47:45 2016 @@ -34,7 +34,7 @@ #include "h2_util.h" #include "h2_push.h" #include "h2_request.h" -#include "h2_response.h" +#include "h2_headers.h" #include "h2_session.h" #include "h2_stream.h" @@ -58,6 +58,7 @@ static const char *policy_str(h2_push_po typedef struct { const h2_request *req; + int push_policy; apr_pool_t *pool; apr_array_header_t *pushes; const char *s; @@ -336,7 +337,7 @@ static int add_push(link_ctx *ctx) */ path = apr_uri_unparse(ctx->pool, &uri, APR_URI_UNP_OMITSITEPART); push = apr_pcalloc(ctx->pool, sizeof(*push)); - switch (ctx->req->push_policy) { + switch (ctx->push_policy) { case H2_PUSH_HEAD: method = "HEAD"; break; @@ -350,7 +351,7 @@ static int add_push(link_ctx *ctx) ctx->req->authority, path, headers, ctx->req->serialize); /* atm, we do not push on pushes */ - h2_request_end_headers(req, ctx->pool, 1, 0); + h2_request_end_headers(req, ctx->pool, 1); push->req = req; if (!ctx->pushes) { @@ -427,10 +428,10 @@ static int head_iter(void *ctx, const ch return 1; } -apr_array_header_t *h2_push_collect(apr_pool_t *p, const h2_request *req, - const h2_response *res) +apr_array_header_t *h2_push_collect(apr_pool_t *p, const h2_request *req, + int push_policy, const h2_headers *res) { - if (req && req->push_policy != H2_PUSH_NONE) { + if (req && push_policy != H2_PUSH_NONE) { /* Collect push candidates from the request/response pair. * * One source for pushes are "rel=preload" link headers @@ -444,11 +445,13 @@ apr_array_header_t *h2_push_collect(apr_ memset(&ctx, 0, sizeof(ctx)); ctx.req = req; + ctx.push_policy = push_policy; ctx.pool = p; apr_table_do(head_iter, &ctx, res->headers, NULL); if (ctx.pushes) { - apr_table_setn(res->headers, "push-policy", policy_str(req->push_policy)); + apr_table_setn(res->headers, "push-policy", + policy_str(push_policy)); } return ctx.pushes; } @@ -681,7 +684,7 @@ apr_array_header_t *h2_push_diary_update apr_array_header_t *h2_push_collect_update(h2_stream *stream, const struct h2_request *req, - const struct h2_response *res) + const struct h2_headers *res) { h2_session *session = stream->session; const char *cache_digest = apr_table_get(req->headers, "Cache-Digest"); @@ -698,7 +701,7 @@ apr_array_header_t *h2_push_collect_upda session->id, cache_digest); } } - pushes = h2_push_collect(stream->pool, req, res); + pushes = h2_push_collect(stream->pool, req, stream->push_policy, res); return h2_push_diary_update(stream->session, pushes); } Modified: httpd/httpd/trunk/modules/http2/h2_push.h URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_push.h?rev=1763158&r1=1763157&r2=1763158&view=diff ============================================================================== --- httpd/httpd/trunk/modules/http2/h2_push.h (original) +++ httpd/httpd/trunk/modules/http2/h2_push.h Mon Oct 3 11:47:45 2016 @@ -18,7 +18,7 @@ #include "h2.h" struct h2_request; -struct h2_response; +struct h2_headers; struct h2_ngheader; struct h2_session; struct h2_stream; @@ -58,7 +58,8 @@ struct h2_push_diary { */ apr_array_header_t *h2_push_collect(apr_pool_t *p, const struct h2_request *req, - const struct h2_response *res); + int push_policy, + const struct h2_headers *res); /** * Create a new push diary for the given maximum number of entries. @@ -81,7 +82,7 @@ apr_array_header_t *h2_push_diary_update */ apr_array_header_t *h2_push_collect_update(struct h2_stream *stream, const struct h2_request *req, - const struct h2_response *res); + const struct h2_headers *res); /** * Get a cache digest as described in * https://datatracker.ietf.org/doc/draft-kazuho-h2-cache-digest/ Modified: httpd/httpd/trunk/modules/http2/h2_request.c URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_request.c?rev=1763158&r1=1763157&r2=1763158&view=diff ============================================================================== --- httpd/httpd/trunk/modules/http2/h2_request.c (original) +++ httpd/httpd/trunk/modules/http2/h2_request.c Mon Oct 3 11:47:45 2016 @@ -36,13 +36,6 @@ #include "h2_util.h" -static apr_status_t inspect_clen(h2_request *req, const char *s) -{ - char *end; - req->content_length = apr_strtoi64(s, &end, 10); - return (s == end)? APR_EINVAL : APR_SUCCESS; -} - typedef struct { apr_table_t *headers; apr_pool_t *pool; @@ -59,7 +52,6 @@ static int set_h1_header(void *ctx, cons } apr_status_t h2_request_rcreate(h2_request **preq, apr_pool_t *pool, - int stream_id, int initiated_on, request_rec *r) { h2_request *req; @@ -86,8 +78,6 @@ apr_status_t h2_request_rcreate(h2_reque } req = apr_pcalloc(pool, sizeof(*req)); - req->id = stream_id; - req->initiated_on = initiated_on; req->method = apr_pstrdup(pool, r->method); req->scheme = scheme; req->authority = authority; @@ -121,8 +111,7 @@ apr_status_t h2_request_add_header(h2_re if (!apr_is_empty_table(req->headers)) { ap_log_perror(APLOG_MARK, APLOG_ERR, 0, pool, APLOGNO(02917) - "h2_request(%d): pseudo header after request start", - req->id); + "h2_request: pseudo header after request start"); return APR_EGENERAL; } @@ -148,8 +137,8 @@ apr_status_t h2_request_add_header(h2_re strncpy(buffer, name, (nlen > 31)? 31 : nlen); ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, pool, APLOGNO(02954) - "h2_request(%d): ignoring unknown pseudo header %s", - req->id, buffer); + "h2_request: ignoring unknown pseudo header %s", + buffer); } } else { @@ -160,8 +149,7 @@ apr_status_t h2_request_add_header(h2_re return status; } -apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool, - int eos, int push) +apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool, int eos) { const char *s; @@ -181,21 +169,9 @@ apr_status_t h2_request_end_headers(h2_r } s = apr_table_get(req->headers, "Content-Length"); - if (s) { - if (inspect_clen(req, s) != APR_SUCCESS) { - ap_log_perror(APLOG_MARK, APLOG_WARNING, APR_EINVAL, pool, - APLOGNO(02959) - "h2_request(%d): content-length value not parsed: %s", - req->id, s); - return APR_EINVAL; - } - req->body = 1; - } - else { + if (!s) { /* no content-length given */ - req->content_length = -1; - req->body = !eos; - if (req->body) { + if (!eos) { /* We have not seen a content-length and have no eos, * simulate a chunked encoding for our HTTP/1.1 infrastructure, * in case we have "H2SerializeHeaders on" here @@ -204,67 +180,16 @@ apr_status_t h2_request_end_headers(h2_r apr_table_mergen(req->headers, "Transfer-Encoding", "chunked"); } else if (apr_table_get(req->headers, "Content-Type")) { - /* If we have a content-type, but already see eos, no more + /* If we have a content-type, but already seen eos, no more * data will come. Signal a zero content length explicitly. */ apr_table_setn(req->headers, "Content-Length", "0"); } } - h2_push_policy_determine(req, pool, push); - - /* In the presence of trailers, force behaviour of chunked encoding */ - s = apr_table_get(req->headers, "Trailer"); - if (s && s[0]) { - req->trailers = apr_table_make(pool, 5); - if (!req->chunked) { - req->chunked = 1; - apr_table_mergen(req->headers, "Transfer-Encoding", "chunked"); - } - } - - return APR_SUCCESS; -} - -static apr_status_t add_h1_trailer(h2_request *req, apr_pool_t *pool, - const char *name, size_t nlen, - const char *value, size_t vlen) -{ - char *hname, *hvalue; - - if (h2_req_ignore_trailer(name, nlen)) { - return APR_SUCCESS; - } - - hname = apr_pstrndup(pool, name, nlen); - hvalue = apr_pstrndup(pool, value, vlen); - h2_util_camel_case_header(hname, nlen); - - apr_table_mergen(req->trailers, hname, hvalue); - return APR_SUCCESS; } - -apr_status_t h2_request_add_trailer(h2_request *req, apr_pool_t *pool, - const char *name, size_t nlen, - const char *value, size_t vlen) -{ - if (!req->trailers) { - ap_log_perror(APLOG_MARK, APLOG_DEBUG, APR_EINVAL, pool, APLOGNO(03059) - "h2_request(%d): unanounced trailers", - req->id); - return APR_EINVAL; - } - if (nlen == 0 || name[0] == ':') { - ap_log_perror(APLOG_MARK, APLOG_DEBUG, APR_EINVAL, pool, APLOGNO(03060) - "h2_request(%d): pseudo header in trailer", - req->id); - return APR_EINVAL; - } - return add_h1_trailer(req, pool, name, nlen, value, vlen); -} - h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src) { h2_request *dst = apr_pmemdup(p, src, sizeof(*dst)); @@ -273,9 +198,6 @@ h2_request *h2_request_clone(apr_pool_t dst->authority = apr_pstrdup(p, src->authority); dst->path = apr_pstrdup(p, src->path); dst->headers = apr_table_clone(p, src->headers); - if (src->trailers) { - dst->trailers = apr_table_clone(p, src->trailers); - } return dst; } @@ -346,8 +268,8 @@ request_rec *h2_request_create_rec(const * request for a vhost where h2 is disabled --> 421. */ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03367) - "h2_request(%d): access_status=%d, request_create failed", - req->id, access_status); + "h2_request: access_status=%d, request_create failed", + access_status); ap_die(access_status, r); ap_update_child_status(c->sbh, SERVER_BUSY_LOG, r); ap_run_log_transaction(r); Modified: httpd/httpd/trunk/modules/http2/h2_request.h URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_request.h?rev=1763158&r1=1763157&r2=1763158&view=diff ============================================================================== --- httpd/httpd/trunk/modules/http2/h2_request.h (original) +++ httpd/httpd/trunk/modules/http2/h2_request.h Mon Oct 3 11:47:45 2016 @@ -19,7 +19,6 @@ #include "h2.h" apr_status_t h2_request_rcreate(h2_request **preq, apr_pool_t *pool, - int stream_id, int initiated_on, request_rec *r); apr_status_t h2_request_add_header(h2_request *req, apr_pool_t *pool, @@ -30,8 +29,7 @@ apr_status_t h2_request_add_trailer(h2_r const char *name, size_t nlen, const char *value, size_t vlen); -apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool, - int eos, int push); +apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool, int eos); h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src); Modified: httpd/httpd/trunk/modules/http2/h2_session.c URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_session.c?rev=1763158&r1=1763157&r2=1763158&view=diff ============================================================================== --- httpd/httpd/trunk/modules/http2/h2_session.c (original) +++ httpd/httpd/trunk/modules/http2/h2_session.c Mon Oct 3 11:47:45 2016 @@ -38,9 +38,8 @@ #include "h2_mplx.h" #include "h2_push.h" #include "h2_request.h" -#include "h2_response.h" +#include "h2_headers.h" #include "h2_stream.h" -#include "h2_from_h1.h" #include "h2_task.h" #include "h2_session.h" #include "h2_util.h" @@ -407,7 +406,7 @@ static int on_header_cb(nghttp2_session status = h2_stream_add_header(stream, (const char *)name, namelen, (const char *)value, valuelen); - if (status != APR_SUCCESS && !stream->response) { + if (status != APR_SUCCESS && !h2_stream_is_ready(stream)) { return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; } return 0; @@ -1138,6 +1137,10 @@ static apr_status_t h2_session_start(h2_ return status; } +static apr_status_t on_stream_headers(h2_session *session, h2_stream *stream, + h2_headers *headers, apr_off_t len, + int eos); + static ssize_t stream_data_cb(nghttp2_session *ng2s, int32_t stream_id, uint8_t *buf, @@ -1171,8 +1174,8 @@ static ssize_t stream_data_cb(nghttp2_se session->id, (int)stream_id); return NGHTTP2_ERR_CALLBACK_FAILURE; } - - status = h2_stream_out_prepare(stream, &nread, &eos); + + status = h2_stream_out_prepare(stream, &nread, &eos, NULL); if (nread) { *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY; } @@ -1191,7 +1194,6 @@ static ssize_t stream_data_cb(nghttp2_se * it. Remember at our h2_stream that we need to do this. */ nread = 0; - h2_mplx_suspend_stream(session->mplx, stream->id); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03071) "h2_stream(%ld-%d): suspending", session->id, (int)stream_id); @@ -1206,25 +1208,8 @@ static ssize_t stream_data_cb(nghttp2_se } if (eos) { - apr_table_t *trailers = h2_stream_get_trailers(stream); - if (trailers && !apr_is_empty_table(trailers)) { - h2_ngheader *nh; - int rv; - - nh = h2_util_ngheader_make(stream->pool, trailers); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03072) - "h2_stream(%ld-%d): submit %d trailers", - session->id, (int)stream_id,(int) nh->nvlen); - rv = nghttp2_submit_trailer(ng2s, stream->id, nh->nv, nh->nvlen); - if (rv < 0) { - nread = rv; - } - *data_flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM; - } - *data_flags |= NGHTTP2_DATA_FLAG_EOF; } - return (ssize_t)nread; } @@ -1423,83 +1408,56 @@ static apr_status_t h2_session_send(h2_s } /** - * A stream was resumed as new output data arrived. + * headers for the stream are ready. */ -static apr_status_t on_stream_resume(void *ctx, int stream_id) +static apr_status_t on_stream_headers(h2_session *session, h2_stream *stream, + h2_headers *headers, apr_off_t len, + int eos) { - h2_session *session = ctx; - h2_stream *stream = get_stream(session, stream_id); apr_status_t status = APR_SUCCESS; - - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "h2_stream(%ld-%d): on_resume", session->id, stream_id); - if (stream) { - int rv; - if (stream->rst_error) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03466) - "h2_stream(%ld-%d): RST_STREAM, err=%d", - session->id, stream->id, stream->rst_error); - rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, - stream->id, stream->rst_error); - } - else { - rv = nghttp2_session_resume_data(session->ngh2, stream_id); - } - session->have_written = 1; - ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)? - APLOG_ERR : APLOG_DEBUG, 0, session->c, - APLOGNO(02936) - "h2_stream(%ld-%d): resuming %s", - session->id, stream->id, rv? nghttp2_strerror(rv) : ""); - } - return status; -} - -/** - * A response for the stream is ready. - */ -static apr_status_t on_stream_response(void *ctx, int stream_id) -{ - h2_session *session = ctx; - h2_stream *stream = get_stream(session, stream_id); - apr_status_t status = APR_SUCCESS; - h2_response *response; int rv = 0; - AP_DEBUG_ASSERT(session); + ap_assert(session); ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "h2_stream(%ld-%d): on_response", session->id, stream_id); - if (!stream) { - return APR_NOTFOUND; - } - else if (!stream->response) { + "h2_stream(%ld-%d): on_headers", session->id, stream->id); + if (!headers) { int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR); - - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03074) + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03466) "h2_stream(%ld-%d): RST_STREAM, err=%d", session->id, stream->id, err); - rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, stream->id, err); goto leave; } - - while ((response = h2_stream_get_unsent_response(stream)) != NULL) { + else if (headers->status < 100) { + rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, + stream->id, headers->status); + goto leave; + } + else if (stream->has_response) { + h2_ngheader *nh; + int rv; + + nh = h2_util_ngheader_make(stream->pool, headers->headers); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03072) + "h2_stream(%ld-%d): submit %d trailers", + session->id, (int)stream->id,(int) nh->nvlen); + rv = nghttp2_submit_trailer(session->ngh2, stream->id, nh->nv, nh->nvlen); + goto leave; + } + else { nghttp2_data_provider provider, *pprovider = NULL; h2_ngheader *ngh; + apr_table_t *hout; const h2_priority *prio; - - if (stream->submitted) { - rv = NGHTTP2_PROTOCOL_ERROR; - goto leave; - } + const char *note; ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073) "h2_stream(%ld-%d): submit response %d, REMOTE_WINDOW_SIZE=%u", - session->id, stream->id, response->http_status, + session->id, stream->id, headers->status, (unsigned int)nghttp2_session_get_stream_remote_window_size(session->ngh2, stream->id)); - if (response->content_length != 0) { + if (!eos || len > 0) { memset(&provider, 0, sizeof(provider)); provider.source.fd = stream->id; provider.read_callback = stream_data_cb; @@ -1522,23 +1480,36 @@ static apr_status_t on_stream_response(v * also have the pushed ones as well. */ if (!stream->initiated_on - && h2_response_is_final(response) - && H2_HTTP_2XX(response->http_status) + && h2_headers_are_response(headers) + && H2_HTTP_2XX(headers->status) && h2_session_push_enabled(session)) { - h2_stream_submit_pushes(stream); + h2_stream_submit_pushes(stream, headers); } - prio = h2_stream_get_priority(stream); + prio = h2_stream_get_priority(stream, headers); if (prio) { h2_session_set_prio(session, stream, prio); } - ngh = h2_util_ngheader_make_res(stream->pool, response->http_status, - response->headers); - rv = nghttp2_submit_response(session->ngh2, response->stream_id, + hout = headers->headers; + note = apr_table_get(headers->notes, H2_FILTER_DEBUG_NOTE); + if (note && !strcmp("on", note)) { + int32_t connFlowIn, connFlowOut; + + connFlowIn = nghttp2_session_get_effective_local_window_size(session->ngh2); + connFlowOut = nghttp2_session_get_remote_window_size(session->ngh2); + hout = apr_table_clone(stream->pool, hout); + apr_table_setn(hout, "conn-flow-in", + apr_itoa(stream->pool, connFlowIn)); + apr_table_setn(hout, "conn-flow-out", + apr_itoa(stream->pool, connFlowOut)); + } + + ngh = h2_util_ngheader_make_res(stream->pool, headers->status, hout); + rv = nghttp2_submit_response(session->ngh2, stream->id, ngh->nv, ngh->nvlen, pprovider); - stream->submitted = h2_response_is_final(response); + stream->has_response = h2_headers_are_response(headers); session->have_written = 1; if (stream->initiated_on) { @@ -1574,6 +1545,48 @@ leave: return status; } +/** + * A stream was resumed as new output data arrived. + */ +static apr_status_t on_stream_resume(void *ctx, int stream_id) +{ + h2_session *session = ctx; + h2_stream *stream = get_stream(session, stream_id); + apr_status_t status = APR_EAGAIN; + int rv; + + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, + "h2_stream(%ld-%d): on_resume", session->id, stream_id); + if (stream) { + apr_off_t len = 0; + int eos = 0; + h2_headers *headers = NULL; + + send_headers: + status = h2_stream_out_prepare(stream, &len, &eos, &headers); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, + "h2_stream(%ld-%d): prepared len=%ld, eos=%d", + session->id, stream_id, (long)len, eos); + if (headers) { + status = on_stream_headers(session, stream, headers, len, eos); + if (status != APR_SUCCESS) { + return status; + } + goto send_headers; + } + else if (status != APR_EAGAIN) { + rv = nghttp2_session_resume_data(session->ngh2, stream_id); + session->have_written = 1; + ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)? + APLOG_ERR : APLOG_DEBUG, 0, session->c, + APLOGNO(02936) + "h2_stream(%ld-%d): resuming %s", + session->id, stream->id, rv? nghttp2_strerror(rv) : ""); + } + } + return status; +} + static apr_status_t h2_session_receive(void *ctx, const char *data, apr_size_t len, apr_size_t *readlen) { @@ -1664,40 +1677,6 @@ static apr_status_t h2_session_read(h2_s return rstatus; } -static int unsubmitted_iter(void *ctx, void *val) -{ - h2_stream *stream = val; - if (h2_stream_needs_submit(stream)) { - *((int *)ctx) = 1; - return 0; - } - return 1; -} - -static int has_unsubmitted_streams(h2_session *session) -{ - int has_unsubmitted = 0; - h2_ihash_iter(session->streams, unsubmitted_iter, &has_unsubmitted); - return has_unsubmitted; -} - -static int suspended_iter(void *ctx, void *val) -{ - h2_stream *stream = val; - if (h2_stream_is_suspended(stream)) { - *((int *)ctx) = 1; - return 0; - } - return 1; -} - -static int has_suspended_streams(h2_session *session) -{ - int has_suspended = 0; - h2_ihash_iter(session->streams, suspended_iter, &has_suspended); - return has_suspended; -} - static const char *StateNames[] = { "INIT", /* H2_SESSION_ST_INIT */ "DONE", /* H2_SESSION_ST_DONE */ @@ -1842,8 +1821,7 @@ static void h2_session_ev_no_io(h2_sessi session->id, session->open_streams); h2_conn_io_flush(&session->io); if (session->open_streams > 0) { - if (has_unsubmitted_streams(session) - || has_suspended_streams(session)) { + if (h2_mplx_awaits_data(session->mplx)) { /* waiting for at least one stream to produce data */ transit(session, "no io", H2_SESSION_ST_WAIT); } @@ -2207,7 +2185,6 @@ apr_status_t h2_session_process(h2_sessi /* trigger window updates, stream resumes and submits */ status = h2_mplx_dispatch_master_events(session->mplx, on_stream_resume, - on_stream_response, session); if (status != APR_SUCCESS) { ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, c, Modified: httpd/httpd/trunk/modules/http2/h2_session.h URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_session.h?rev=1763158&r1=1763157&r2=1763158&view=diff ============================================================================== --- httpd/httpd/trunk/modules/http2/h2_session.h (original) +++ httpd/httpd/trunk/modules/http2/h2_session.h Mon Oct 3 11:47:45 2016 @@ -49,7 +49,6 @@ struct h2_mplx; struct h2_priority; struct h2_push; struct h2_push_diary; -struct h2_response; struct h2_session; struct h2_stream; struct h2_task; @@ -187,11 +186,6 @@ void h2_session_abort(h2_session *sessio */ void h2_session_close(h2_session *session); -/* Start submitting the response to a stream request. This is possible - * once we have all the response headers. */ -apr_status_t h2_session_handle_response(h2_session *session, - struct h2_stream *stream); - /** * Create and register a new stream under the given id. * Modified: httpd/httpd/trunk/modules/http2/h2_stream.c URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_stream.c?rev=1763158&r1=1763157&r2=1763158&view=diff ============================================================================== --- httpd/httpd/trunk/modules/http2/h2_stream.c (original) +++ httpd/httpd/trunk/modules/http2/h2_stream.c Mon Oct 3 11:47:45 2016 @@ -16,6 +16,8 @@ #include #include +#include + #include #include #include @@ -29,11 +31,10 @@ #include "h2_conn.h" #include "h2_config.h" #include "h2_h2.h" -#include "h2_filter.h" #include "h2_mplx.h" #include "h2_push.h" #include "h2_request.h" -#include "h2_response.h" +#include "h2_headers.h" #include "h2_session.h" #include "h2_stream.h" #include "h2_task.h" @@ -62,8 +63,8 @@ static void H2_STREAM_OUT_LOG(int lvl, h apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]); len = h2_util_bb_print(buffer, bmax, tag, "", s->buffer); - ap_log_cerror(APLOG_MARK, lvl, 0, c, "bb_dump(%ld-%d): %s", - c->id, s->id, len? buffer : line); + ap_log_cerror(APLOG_MARK, lvl, 0, c, "bb_dump(%s): %s", + c->log_id, len? buffer : line); } } @@ -150,6 +151,23 @@ static int output_open(h2_stream *stream } } +static void prep_output(h2_stream *stream) { + conn_rec *c = stream->session->c; + if (!stream->buffer) { + stream->buffer = apr_brigade_create(stream->pool, c->bucket_alloc); + } +} + +static void prepend_response(h2_stream *stream, h2_headers *response) +{ + conn_rec *c = stream->session->c; + apr_bucket *b; + + prep_output(stream); + b = h2_bucket_headers_create(c->bucket_alloc, response); + APR_BRIGADE_INSERT_HEAD(stream->buffer, b); +} + static apr_status_t stream_pool_cleanup(void *ctx) { h2_stream *stream = ctx; @@ -252,21 +270,6 @@ void h2_stream_rst(h2_stream *stream, in stream->session->id, stream->id, error_code); } -struct h2_response *h2_stream_get_response(h2_stream *stream) -{ - return stream->response; -} - -struct h2_response *h2_stream_get_unsent_response(h2_stream *stream) -{ - h2_response *unsent = (stream->last_sent? - stream->last_sent->next : stream->response); - if (unsent) { - stream->last_sent = unsent; - } - return unsent; -} - apr_status_t h2_stream_set_request_rec(h2_stream *stream, request_rec *r) { h2_request *req; @@ -277,8 +280,7 @@ apr_status_t h2_stream_set_request_rec(h if (stream->rst_error) { return APR_ECONNRESET; } - status = h2_request_rcreate(&req, stream->pool, stream->id, - stream->initiated_on, r); + status = h2_request_rcreate(&req, stream->pool, r); ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO(03058) "h2_request(%d): set_request_rec %s host=%s://%s%s", stream->id, req->method, req->scheme, req->authority, @@ -295,13 +297,40 @@ apr_status_t h2_stream_set_request(h2_st return APR_SUCCESS; } +static apr_status_t add_trailer(h2_stream *stream, + const char *name, size_t nlen, + const char *value, size_t vlen) +{ + conn_rec *c = stream->session->c; + char *hname, *hvalue; + + if (nlen == 0 || name[0] == ':') { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, APR_EINVAL, c, APLOGNO(03060) + "h2_request(%ld-%d): pseudo header in trailer", + c->id, stream->id); + return APR_EINVAL; + } + if (h2_req_ignore_trailer(name, nlen)) { + return APR_SUCCESS; + } + if (!stream->trailers) { + stream->trailers = apr_table_make(stream->pool, 5); + } + hname = apr_pstrndup(stream->pool, name, nlen); + hvalue = apr_pstrndup(stream->pool, value, vlen); + h2_util_camel_case_header(hname, nlen); + apr_table_mergen(stream->trailers, hname, hvalue); + + return APR_SUCCESS; +} + apr_status_t h2_stream_add_header(h2_stream *stream, const char *name, size_t nlen, const char *value, size_t vlen) { AP_DEBUG_ASSERT(stream); - if (!stream->response) { + if (!stream->has_response) { if (name[0] == ':') { if ((vlen) > stream->session->s->limit_req_line) { /* pseudo header: approximation of request line size check */ @@ -336,10 +365,7 @@ apr_status_t h2_stream_add_header(h2_str } if (h2_stream_is_scheduled(stream)) { - /* FIXME: this is not clean. we modify a struct that is being processed - * by another thread potentially. */ - return h2_request_add_trailer((h2_request*)stream->request, stream->pool, - name, nlen, value, vlen); + return add_trailer(stream, name, nlen, value, vlen); } else { if (!stream->rtmp) { @@ -366,36 +392,38 @@ apr_status_t h2_stream_schedule(h2_strea if (eos) { close_input(stream); } + + if (!stream->input) { + h2_beam_create(&stream->input, stream->pool, stream->id, "input", 0); + } - if (stream->response) { + if (h2_stream_is_ready(stream)) { /* already have a resonse, probably a HTTP error code */ return h2_mplx_process(stream->session->mplx, stream, cmp, ctx); } else if (!stream->request && stream->rtmp) { /* This is the common case: a h2_request was being assembled, now * it gets finalized and checked for completness */ - status = h2_request_end_headers(stream->rtmp, stream->pool, - eos, push_enabled); + status = h2_request_end_headers(stream->rtmp, stream->pool, eos); if (status == APR_SUCCESS) { - stream->rtmp->id = stream->id; - stream->rtmp->initiated_on = stream->initiated_on; stream->rtmp->serialize = h2_config_geti(stream->session->config, H2_CONF_SER_HEADERS); stream->request = stream->rtmp; stream->rtmp = NULL; stream->scheduled = 1; - stream->input_remaining = stream->request->content_length; + stream->push_policy = h2_push_policy_determine(stream->request->headers, + stream->pool, push_enabled); + status = h2_mplx_process(stream->session->mplx, stream, cmp, ctx); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, "h2_stream(%ld-%d): scheduled %s %s://%s%s " - "clen=%ld, body=%d, chunked=%d", + "chunked=%d", stream->session->id, stream->id, stream->request->method, stream->request->scheme, stream->request->authority, stream->request->path, - (long)stream->request->content_length, - stream->request->body, stream->request->chunked); + stream->request->chunked); return status; } } @@ -420,21 +448,36 @@ int h2_stream_is_scheduled(const h2_stre apr_status_t h2_stream_close_input(h2_stream *stream) { + conn_rec *c = stream->session->c; apr_status_t status = APR_SUCCESS; - - AP_DEBUG_ASSERT(stream); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, "h2_stream(%ld-%d): closing input", stream->session->id, stream->id); - if (stream->rst_error) { return APR_ECONNRESET; } - if (close_input(stream) && stream->input) { - status = h2_beam_close(stream->input); + if (!stream->input) { + h2_beam_create(&stream->input, stream->pool, stream->id, "input", 0); } - return status; + + if (stream->trailers && !apr_is_empty_table(stream->trailers)) { + h2_headers *r = h2_headers_create(HTTP_OK, stream->trailers, + NULL, stream->pool); + apr_bucket *b = h2_bucket_headers_create(c->bucket_alloc, r); + apr_bucket_brigade *tmp; + + tmp = apr_brigade_create(stream->pool, c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(tmp, b); + status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ); + apr_brigade_destroy(tmp); + + stream->trailers = NULL; + } + + close_input(stream); + return h2_beam_close(stream->input); } apr_status_t h2_stream_write_data(h2_stream *stream, @@ -459,52 +502,22 @@ apr_status_t h2_stream_write_data(h2_str ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_stream(%ld-%d): add %ld input bytes", stream->session->id, stream->id, (long)len); - - if (!stream->request->chunked) { - stream->input_remaining -= len; - if (stream->input_remaining < 0) { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c, - APLOGNO(02961) - "h2_stream(%ld-%d): got %ld more content bytes than announced " - "in content-length header: %ld", - stream->session->id, stream->id, - (long)stream->request->content_length, - -(long)stream->input_remaining); - h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR); - return APR_ECONNABORTED; - } - } tmp = apr_brigade_create(stream->pool, c->bucket_alloc); apr_brigade_write(tmp, NULL, NULL, data, len); - if (eos) { - APR_BRIGADE_INSERT_TAIL(tmp, apr_bucket_eos_create(c->bucket_alloc)); - close_input(stream); - } status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ); apr_brigade_destroy(tmp); stream->in_data_frames++; stream->in_data_octets += len; + if (eos) { + return h2_stream_close_input(stream); + } + return status; } -void h2_stream_set_suspended(h2_stream *stream, int suspended) -{ - AP_DEBUG_ASSERT(stream); - stream->suspended = !!suspended; - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, - "h2_stream(%ld-%d): suspended=%d", - stream->session->id, stream->id, stream->suspended); -} - -int h2_stream_is_suspended(const h2_stream *stream) -{ - AP_DEBUG_ASSERT(stream); - return stream->suspended; -} - static apr_status_t fill_buffer(h2_stream *stream, apr_size_t amount) { conn_rec *c = stream->session->c; @@ -549,89 +562,64 @@ static apr_status_t fill_buffer(h2_strea return status; } -apr_status_t h2_stream_add_response(h2_stream *stream, h2_response *response, - h2_bucket_beam *output) -{ - apr_status_t status = APR_SUCCESS; - conn_rec *c = stream->session->c; - h2_response **pr = &stream->response; - - if (!output_open(stream)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, - "h2_stream(%ld-%d): output closed", - stream->session->id, stream->id); - return APR_ECONNRESET; - } - if (stream->submitted) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, - "h2_stream(%ld-%d): already submitted final response", - stream->session->id, stream->id); - return APR_ECONNRESET; - } - - /* append */ - while (*pr) { - pr = &((*pr)->next); - } - *pr = response; - - if (h2_response_is_final(response)) { - stream->output = output; - stream->buffer = apr_brigade_create(stream->pool, c->bucket_alloc); - - h2_stream_filter(stream); - if (stream->output) { - status = fill_buffer(stream, 0); - } - } - - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, - "h2_stream(%ld-%d): set_response(%d)", - stream->session->id, stream->id, - stream->response->http_status); - return status; -} - apr_status_t h2_stream_set_error(h2_stream *stream, int http_status) { - h2_response *response; + h2_headers *response; - if (stream->submitted) { + if (h2_stream_is_ready(stream)) { return APR_EINVAL; } if (stream->rtmp) { stream->request = stream->rtmp; stream->rtmp = NULL; } - response = h2_response_die(stream->id, http_status, - stream->request, stream->pool); - return h2_stream_add_response(stream, response, NULL); + response = h2_headers_die(http_status, stream->request, stream->pool); + prepend_response(stream, response); + return APR_SUCCESS; } -static const apr_size_t DATA_CHUNK_SIZE = ((16*1024) - 100 - 9); +static apr_bucket *get_first_headers_bucket(apr_bucket_brigade *bb) +{ + if (bb) { + apr_bucket *b = APR_BRIGADE_FIRST(bb); + while (b != APR_BRIGADE_SENTINEL(bb)) { + if (H2_BUCKET_IS_HEADERS(b)) { + return b; + } + b = APR_BUCKET_NEXT(b); + } + } + return NULL; +} -apr_status_t h2_stream_out_prepare(h2_stream *stream, - apr_off_t *plen, int *peos) +apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen, + int *peos, h2_headers **presponse) { conn_rec *c = stream->session->c; apr_status_t status = APR_SUCCESS; apr_off_t requested; + apr_bucket *b, *e; + if (presponse) { + *presponse = NULL; + } + if (stream->rst_error) { *plen = 0; *peos = 1; return APR_ECONNRESET; } - - if (!stream->buffer) { - return APR_EAGAIN; - } + if (!output_open(stream)) { + return APR_ECONNRESET; + } + prep_output(stream); + if (*plen > 0) { - requested = H2MIN(*plen, DATA_CHUNK_SIZE); + requested = H2MIN(*plen, H2_DATA_CHUNK_SIZE); } else { - requested = DATA_CHUNK_SIZE; + requested = H2_DATA_CHUNK_SIZE; } *plen = requested; @@ -639,7 +627,7 @@ apr_status_t h2_stream_out_prepare(h2_st h2_util_bb_avail(stream->buffer, plen, peos); if (!*peos && *plen < requested) { /* try to get more data */ - status = fill_buffer(stream, (requested - *plen) + DATA_CHUNK_SIZE); + status = fill_buffer(stream, (requested - *plen) + H2_DATA_CHUNK_SIZE); if (APR_STATUS_IS_EOF(status)) { apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc); APR_BRIGADE_INSERT_TAIL(stream->buffer, eos); @@ -653,17 +641,66 @@ apr_status_t h2_stream_out_prepare(h2_st h2_util_bb_avail(stream->buffer, plen, peos); } H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_post"); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, - "h2_stream(%ld-%d): prepare, len=%ld eos=%d, trailers=%s", - c->id, stream->id, (long)*plen, *peos, - (stream->response && stream->response->trailers)? - "yes" : "no"); - if (!*peos && !*plen && status == APR_SUCCESS) { - return APR_EAGAIN; + + b = APR_BRIGADE_FIRST(stream->buffer); + while (b != APR_BRIGADE_SENTINEL(stream->buffer)) { + e = APR_BUCKET_NEXT(b); + if (APR_BUCKET_IS_FLUSH(b)) { + APR_BUCKET_REMOVE(b); + apr_bucket_destroy(b); + } + else { + break; + } + b = e; } + + b = get_first_headers_bucket(stream->buffer); + if (b) { + /* there are HEADERS to submit */ + *peos = 0; + *plen = 0; + if (b == APR_BRIGADE_FIRST(stream->buffer)) { + if (presponse) { + *presponse = h2_bucket_headers_get(b); + APR_BUCKET_REMOVE(b); + apr_bucket_destroy(b); + status = APR_SUCCESS; + } + else { + /* someone needs to retrieve the response first */ + h2_mplx_keep_active(stream->session->mplx, stream->id); + status = APR_EAGAIN; + } + } + else { + apr_bucket *e = APR_BRIGADE_FIRST(stream->buffer); + while (e != APR_BRIGADE_SENTINEL(stream->buffer)) { + if (e == b) { + break; + } + else if (e->length != (apr_size_t)-1) { + *plen += e->length; + } + e = APR_BUCKET_NEXT(e); + } + } + } + + if (!*peos && !*plen && status == APR_SUCCESS + && (!presponse || !*presponse)) { + status = APR_EAGAIN; + } + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, + "h2_stream(%ld-%d): prepare, len=%ld eos=%d", + c->id, stream->id, (long)*plen, *peos); return status; } +static int is_not_headers(apr_bucket *b) +{ + return !H2_BUCKET_IS_HEADERS(b); +} apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, apr_off_t *plen, int *peos) @@ -674,7 +711,7 @@ apr_status_t h2_stream_read_to(h2_stream if (stream->rst_error) { return APR_ECONNRESET; } - status = h2_append_brigade(bb, stream->buffer, plen, peos); + status = h2_append_brigade(bb, stream->buffer, plen, peos, is_not_headers); if (status == APR_SUCCESS && !*peos && !*plen) { status = APR_EAGAIN; } @@ -690,27 +727,13 @@ int h2_stream_input_is_open(const h2_str return input_open(stream); } -int h2_stream_needs_submit(const h2_stream *stream) -{ - switch (stream->state) { - case H2_STREAM_ST_OPEN: - case H2_STREAM_ST_CLOSED_INPUT: - case H2_STREAM_ST_CLOSED_OUTPUT: - case H2_STREAM_ST_CLOSED: - return !stream->submitted; - default: - return 0; - } -} - -apr_status_t h2_stream_submit_pushes(h2_stream *stream) +apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response) { apr_status_t status = APR_SUCCESS; apr_array_header_t *pushes; int i; - pushes = h2_push_collect_update(stream, stream->request, - stream->response); + pushes = h2_push_collect_update(stream, stream->request, response); if (pushes && !apr_is_empty_array(pushes)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, "h2_stream(%ld-%d): found %d push candidates", @@ -729,13 +752,14 @@ apr_status_t h2_stream_submit_pushes(h2_ apr_table_t *h2_stream_get_trailers(h2_stream *stream) { - return stream->response? stream->response->trailers : NULL; + return NULL; } -const h2_priority *h2_stream_get_priority(h2_stream *stream) +const h2_priority *h2_stream_get_priority(h2_stream *stream, + h2_headers *response) { - if (stream->response && stream->initiated_on) { - const char *ctype = apr_table_get(stream->response->headers, "content-type"); + if (response && stream->initiated_on) { + const char *ctype = apr_table_get(response->headers, "content-type"); if (ctype) { /* FIXME: Not good enough, config needs to come from request->server */ return h2_config_get_priority(stream->session->config, ctype); @@ -767,3 +791,15 @@ const char *h2_stream_state_str(h2_strea } } +int h2_stream_is_ready(h2_stream *stream) +{ + if (stream->has_response) { + return 1; + } + else if (stream->buffer && get_first_headers_bucket(stream->buffer)) { + return 1; + } + return 0; +} + + Modified: httpd/httpd/trunk/modules/http2/h2_stream.h URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_stream.h?rev=1763158&r1=1763157&r2=1763158&view=diff ============================================================================== --- httpd/httpd/trunk/modules/http2/h2_stream.h (original) +++ httpd/httpd/trunk/modules/http2/h2_stream.h Mon Oct 3 11:47:45 2016 @@ -25,16 +25,16 @@ * connection to the client. The h2_session writes to the h2_stream, * adding HEADERS and DATA and finally an EOS. When headers are done, * h2_stream is scheduled for handling, which is expected to produce - * a h2_response. + * a h2_headers. * - * The h2_response gives the HEADER frames to sent to the client, followed + * The h2_headers gives the HEADER frames to sent to the client, followed * by DATA frames read from the h2_stream until EOS is reached. */ struct h2_mplx; struct h2_priority; struct h2_request; -struct h2_response; +struct h2_headers; struct h2_session; struct h2_sos; struct h2_bucket_beam; @@ -51,27 +51,27 @@ struct h2_stream { apr_pool_t *pool; /* the memory pool for this stream */ const struct h2_request *request; /* the request made in this stream */ struct h2_request *rtmp; /* request being assembled */ + apr_table_t *trailers; /* optional incoming trailers */ struct h2_bucket_beam *input; int request_headers_added; /* number of request headers added */ + unsigned int push_policy; /* which push policy to use for this request */ - struct h2_response *response; - struct h2_response *last_sent; struct h2_bucket_beam *output; apr_bucket_brigade *buffer; apr_array_header_t *files; /* apr_file_t* we collected during I/O */ int rst_error; /* stream error for RST_STREAM */ unsigned int aborted : 1; /* was aborted */ - unsigned int suspended : 1; /* DATA sending has been suspended */ unsigned int scheduled : 1; /* stream has been scheduled */ unsigned int started : 1; /* stream has started processing */ - unsigned int submitted : 1; /* response HEADER has been sent */ + unsigned int has_response : 1; /* response headers are known */ - apr_off_t input_remaining; /* remaining bytes on input as advertised via content-length */ apr_off_t out_data_frames; /* # of DATA frames sent */ apr_off_t out_data_octets; /* # of DATA octets (payload) sent */ apr_off_t in_data_frames; /* # of DATA frames received */ apr_off_t in_data_octets; /* # of DATA octets (payload) received */ + + const char *sos_filter; }; @@ -188,22 +188,6 @@ apr_status_t h2_stream_schedule(h2_strea */ int h2_stream_is_scheduled(const h2_stream *stream); -struct h2_response *h2_stream_get_response(h2_stream *stream); -struct h2_response *h2_stream_get_unsent_response(h2_stream *stream); - -/** - * Set the response for this stream. Invoked when all meta data for - * the stream response has been collected. - * - * @param stream the stream to set the response for - * @param response the response data for the stream - * @param bb bucket brigade with output data for the stream. Optional, - * may be incomplete. - */ -apr_status_t h2_stream_add_response(h2_stream *stream, - struct h2_response *response, - struct h2_bucket_beam *output); - /** * Set the HTTP error status as response. */ @@ -218,12 +202,13 @@ apr_status_t h2_stream_set_error(h2_stre * may be read without blocking * @param peos (out) != 0 iff end of stream will be reached when reading plen * bytes (out value). + * @param presponse (out) the response of one became available * @return APR_SUCCESS if out information was computed successfully. * APR_EAGAIN if not data is available and end of stream has not been * reached yet. */ -apr_status_t h2_stream_out_prepare(h2_stream *stream, - apr_off_t *plen, int *peos); +apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen, + int *peos, h2_headers **presponse); /** * Read a maximum number of bytes into the bucket brigade. @@ -251,20 +236,6 @@ apr_status_t h2_stream_read_to(h2_stream apr_table_t *h2_stream_get_trailers(h2_stream *stream); /** - * Set the suspended state of the stream. - * @param stream the stream to change state on - * @param suspended boolean value if stream is suspended - */ -void h2_stream_set_suspended(h2_stream *stream, int suspended); - -/** - * Check if the stream has been suspended. - * @param stream the stream to check - * @return != 0 iff stream is suspended. - */ -int h2_stream_is_suspended(const h2_stream *stream); - -/** * Check if the stream has open input. * @param stream the stream to check * @return != 0 iff stream has open input. @@ -272,24 +243,18 @@ int h2_stream_is_suspended(const h2_stre int h2_stream_input_is_open(const h2_stream *stream); /** - * Check if the stream has not submitted a response or RST yet. - * @param stream the stream to check - * @return != 0 iff stream has not submitted a response or RST. - */ -int h2_stream_needs_submit(const h2_stream *stream); - -/** * Submit any server push promises on this stream and schedule * the tasks connection with these. * * @param stream the stream for which to submit */ -apr_status_t h2_stream_submit_pushes(h2_stream *stream); +apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response); /** * Get priority information set for this stream. */ -const struct h2_priority *h2_stream_get_priority(h2_stream *stream); +const struct h2_priority *h2_stream_get_priority(h2_stream *stream, + h2_headers *response); /** * Return a textual representation of the stream state as in RFC 7540 @@ -297,4 +262,10 @@ const struct h2_priority *h2_stream_get_ */ const char *h2_stream_state_str(h2_stream *stream); +/** + * Determine if stream is ready for submitting a response or a RST + * @param stream the stream to check + */ +int h2_stream_is_ready(h2_stream *stream); + #endif /* defined(__mod_h2__h2_stream__) */