Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 80188200C3A for ; Fri, 31 Mar 2017 21:41:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7ED39160B80; Fri, 31 Mar 2017 19:41:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 055F2160B8C for ; Fri, 31 Mar 2017 21:41:03 +0200 (CEST) Received: (qmail 80534 invoked by uid 500); 31 Mar 2017 19:41:03 -0000 Mailing-List: contact cvs-help@httpd.apache.org; run by ezmlm Precedence: bulk Reply-To: dev@httpd.apache.org list-help: list-unsubscribe: List-Post: List-Id: Delivered-To: mailing list cvs@httpd.apache.org Received: (qmail 80190 invoked by uid 99); 31 Mar 2017 19:41:03 -0000 Received: from Unknown (HELO svn01-us-west.apache.org) (209.188.14.144) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 31 Mar 2017 19:41:03 +0000 Received: from svn01-us-west.apache.org (localhost [127.0.0.1]) by svn01-us-west.apache.org (ASF Mail Server at svn01-us-west.apache.org) with ESMTP id 2B5323A0B27 for ; Fri, 31 Mar 2017 19:41:02 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1789739 [3/4] - in /httpd/httpd/branches/2.4.x: ./ docs/manual/ docs/manual/mod/ modules/http2/ Date: Fri, 31 Mar 2017 19:41:01 -0000 To: cvs@httpd.apache.org From: icing@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20170331194102.2B5323A0B27@svn01-us-west.apache.org> archived-at: Fri, 31 Mar 2017 19:41:06 -0000 Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_session.c URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_session.c?rev=1789739&r1=1789738&r2=1789739&view=diff ============================================================================== --- httpd/httpd/branches/2.4.x/modules/http2/h2_session.c (original) +++ httpd/httpd/branches/2.4.x/modules/http2/h2_session.c Fri Mar 31 19:41:01 2017 @@ -56,6 +56,7 @@ static void transit(h2_session *session, static void on_stream_state_enter(void *ctx, h2_stream *stream); static void on_stream_state_event(void *ctx, h2_stream *stream, h2_stream_event_t ev); +static void on_stream_event(void *ctx, h2_stream *stream, h2_stream_event_t ev); static int h2_session_status_from_apr_status(apr_status_t rv) { @@ -71,26 +72,20 @@ static int h2_session_status_from_apr_st return NGHTTP2_ERR_PROTO; } -static void update_window(void *ctx, int stream_id, apr_off_t bytes_read) +static h2_stream *get_stream(h2_session *session, int stream_id) { - h2_session *session = (h2_session*)ctx; - while (bytes_read > 0) { - int len = (bytes_read > INT_MAX)? INT_MAX : bytes_read; - nghttp2_session_consume(session->ngh2, stream_id, (int)bytes_read); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "h2_stream(%ld-%d): consumed %d bytes", - session->id, stream_id, len); - bytes_read -= len; - } + return nghttp2_session_get_stream_user_data(session->ngh2, stream_id); } -static apr_status_t h2_session_receive(void *ctx, - const char *data, apr_size_t len, - apr_size_t *readlen); - static void dispatch_event(h2_session *session, h2_session_event_t ev, int err, const char *msg); +void h2_session_event(h2_session *session, h2_session_event_t ev, + int err, const char *msg) +{ + dispatch_event(session, ev, err, msg); +} + static int rst_unprocessed_stream(h2_stream *stream, void *ctx) { int unprocessed = (!h2_stream_was_closed(stream) @@ -227,11 +222,6 @@ static int on_invalid_frame_recv_cb(nght return 0; } -static h2_stream *get_stream(h2_session *session, int stream_id) -{ - return nghttp2_session_get_stream_user_data(session->ngh2, stream_id); -} - static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len, void *userp) @@ -703,7 +693,6 @@ static apr_status_t session_cleanup(h2_s } transit(session, trigger, H2_SESSION_ST_CLEANUP); - h2_mplx_set_consumed_cb(session->mplx, NULL, NULL); h2_mplx_release_and_join(session->mplx, session->iowait); session->mplx = NULL; @@ -803,23 +792,33 @@ static apr_status_t h2_session_create_in return status; } + session->in_pending = h2_iq_create(session->pool, session->max_stream_count); + if (session->in_pending == NULL) { + apr_pool_destroy(pool); + return APR_ENOMEM; + } + + session->in_process = h2_iq_create(session->pool, session->max_stream_count); + if (session->in_process == NULL) { + apr_pool_destroy(pool); + return APR_ENOMEM; + } + session->monitor = apr_pcalloc(pool, sizeof(h2_stream_monitor)); if (session->monitor == NULL) { apr_pool_destroy(pool); - return status; + return APR_ENOMEM; } session->monitor->ctx = session; session->monitor->on_state_enter = on_stream_state_enter; session->monitor->on_state_event = on_stream_state_event; + session->monitor->on_event = on_stream_event; session->mplx = h2_mplx_create(c, session->pool, session->config, workers); - h2_mplx_set_consumed_cb(session->mplx, update_window, session); - - /* Install the connection input filter that feeds the session */ - session->cin = h2_filter_cin_create(session->pool, - h2_session_receive, session); + /* connection input filter that feeds the session */ + session->cin = h2_filter_cin_create(session); ap_add_input_filter("H2_IN", session->cin, r, c); h2_conn_io_init(&session->io, c, session->config); @@ -871,8 +870,8 @@ static apr_status_t h2_session_create_in "push_diary(type=%d,N=%d)"), (int)session->max_stream_count, (int)session->max_stream_mem, - session->mplx->workers_limit, - session->mplx->workers_max, + session->mplx->limit_active, + session->mplx->max_active, session->push_diary->dtype, (int)session->push_diary->N); } @@ -1431,7 +1430,8 @@ send_headers: if (!stream->has_response) { /* but no response */ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, - H2_STRM_LOG(APLOGNO(03466), stream, "no response, RST_STREAM")); + H2_STRM_LOG(APLOGNO(03466), stream, + "no response, RST_STREAM")); h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR); return APR_SUCCESS; } @@ -1444,32 +1444,32 @@ send_headers: return status; } -static apr_status_t h2_session_receive(void *ctx, const char *data, - apr_size_t len, apr_size_t *readlen) +static void h2_session_in_flush(h2_session *session) { - h2_session *session = ctx; - ssize_t n; + int id; - if (len > 0) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - H2_SSSN_MSG(session, "feeding %ld bytes to nghttp2"), - (long)len); - n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)data, len); - if (n < 0) { - if (nghttp2_is_fatal((int)n)) { - dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, (int)n, nghttp2_strerror((int)n)); - return APR_EGENERAL; + while ((id = h2_iq_shift(session->in_process)) > 0) { + h2_stream *stream = get_stream(session, id); + if (stream) { + ap_assert(!stream->scheduled); + if (h2_stream_prep_processing(stream) == APR_SUCCESS) { + h2_mplx_process(session->mplx, stream, stream_pri_cmp, session); + } + else { + h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR); } } - else { - *readlen = n; - session->io.bytes_read += n; + } + + while ((id = h2_iq_shift(session->in_pending)) > 0) { + h2_stream *stream = get_stream(session, id); + if (stream) { + h2_stream_flush_input(stream); } } - return APR_SUCCESS; } -static apr_status_t h2_session_read(h2_session *session, int block) +static apr_status_t session_read(h2_session *session, apr_size_t readlen, int block) { apr_status_t status, rstatus = APR_EAGAIN; conn_rec *c = session->c; @@ -1481,7 +1481,7 @@ static apr_status_t h2_session_read(h2_s status = ap_get_brigade(c->input_filters, session->bbtmp, AP_MODE_READBYTES, block? APR_BLOCK_READ : APR_NONBLOCK_READ, - APR_BUCKET_BUFF_SIZE); + H2MAX(APR_BUCKET_BUFF_SIZE, readlen)); /* get rid of any possible data we do not expect to get */ apr_brigade_cleanup(session->bbtmp); @@ -1523,16 +1523,25 @@ static apr_status_t h2_session_read(h2_s * status. */ return rstatus; } - if ((session->io.bytes_read - read_start) > (64*1024)) { + if ((session->io.bytes_read - read_start) > readlen) { /* read enough in one go, give write a chance */ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c, - H2_SSSN_MSG(session, "read 64k, returning")); + H2_SSSN_MSG(session, "read enough, returning")); break; } } return rstatus; } +static apr_status_t h2_session_read(h2_session *session, int block) +{ + apr_status_t status = session_read(session, session->max_stream_mem + * H2MAX(2, session->open_streams), + block); + h2_session_in_flush(session); + return status; +} + static const char *StateNames[] = { "INIT", /* H2_SESSION_ST_INIT */ "DONE", /* H2_SESSION_ST_DONE */ @@ -1769,24 +1778,17 @@ static void h2_session_ev_pre_close(h2_s static void ev_stream_open(h2_session *session, h2_stream *stream) { + h2_iq_append(session->in_process, stream->id); switch (session->state) { case H2_SESSION_ST_IDLE: if (session->open_streams == 1) { - /* enter tiomeout, since we have a stream again */ + /* enter timeout, since we have a stream again */ session->idle_until = (session->s->timeout + apr_time_now()); } break; default: break; } - - ap_assert(!stream->scheduled); - if (h2_stream_prep_processing(stream) == APR_SUCCESS) { - h2_mplx_process(session->mplx, stream, stream_pri_cmp, session); - } - else { - h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR); - } } static void ev_stream_closed(h2_session *session, h2_stream *stream) @@ -1862,6 +1864,20 @@ static void on_stream_state_enter(void * break; } } + +static void on_stream_event(void *ctx, h2_stream *stream, + h2_stream_event_t ev) +{ + h2_session *session = ctx; + switch (ev) { + case H2_SEV_IN_DATA_PENDING: + h2_iq_append(session->in_pending, stream->id); + break; + default: + /* NOP */ + break; + } +} static void on_stream_state_event(void *ctx, h2_stream *stream, h2_stream_event_t ev) Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_session.h URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_session.h?rev=1789739&r1=1789738&r2=1789739&view=diff ============================================================================== --- httpd/httpd/branches/2.4.x/modules/http2/h2_session.h (original) +++ httpd/httpd/branches/2.4.x/modules/http2/h2_session.h Fri Mar 31 19:41:01 2017 @@ -125,6 +125,10 @@ typedef struct h2_session { char status[64]; /* status message for scoreboard */ int last_status_code; /* the one already reported */ const char *last_status_msg; /* the one already reported */ + + struct h2_iqueue *in_pending; /* all streams with input pending */ + struct h2_iqueue *in_process; /* all streams ready for processing on slave */ + } h2_session; const char *h2_session_state_str(h2_session_state state); @@ -155,6 +159,9 @@ apr_status_t h2_session_rcreate(h2_sessi request_rec *r, struct h2_ctx *ctx, struct h2_workers *workers); +void h2_session_event(h2_session *session, h2_session_event_t ev, + int err, const char *msg); + /** * Process the given HTTP/2 session until it is ended or a fatal * error occurred. Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_stream.c URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_stream.c?rev=1789739&r1=1789738&r2=1789739&view=diff ============================================================================== --- httpd/httpd/branches/2.4.x/modules/http2/h2_stream.c (original) +++ httpd/httpd/branches/2.4.x/modules/http2/h2_stream.c Fri Mar 31 19:41:01 2017 @@ -43,18 +43,6 @@ #include "h2_util.h" -#define S_XXX (-2) -#define S_ERR (-1) -#define S_NOP (0) -#define S_IDL (H2_SS_IDL + 1) -#define S_RS_L (H2_SS_RSVD_L + 1) -#define S_RS_R (H2_SS_RSVD_R + 1) -#define S_OPEN (H2_SS_OPEN + 1) -#define S_CL_L (H2_SS_CLOSED_L + 1) -#define S_CL_R (H2_SS_CLOSED_R + 1) -#define S_CLS (H2_SS_CLOSED + 1) -#define S_CLN (H2_SS_CLEANUP + 1) - static const char *h2_ss_str(h2_stream_state_t state) { switch (state) { @@ -84,37 +72,54 @@ const char *h2_stream_state_str(h2_strea return h2_ss_str(stream->state); } +/* Abbreviations for stream transit tables */ +#define S_XXX (-2) /* Programming Error */ +#define S_ERR (-1) /* Protocol Error */ +#define S_NOP (0) /* No Change */ +#define S_IDL (H2_SS_IDL + 1) +#define S_RS_L (H2_SS_RSVD_L + 1) +#define S_RS_R (H2_SS_RSVD_R + 1) +#define S_OPEN (H2_SS_OPEN + 1) +#define S_CL_L (H2_SS_CLOSED_L + 1) +#define S_CL_R (H2_SS_CLOSED_R + 1) +#define S_CLS (H2_SS_CLOSED + 1) +#define S_CLN (H2_SS_CLEANUP + 1) + +/* state transisitions when certain frame types are sent */ static int trans_on_send[][H2_SS_MAX] = { -/* S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */ -/* DATA, */ { S_ERR, S_ERR, S_ERR, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, }, -/* HEADERS, */ { S_ERR, S_ERR, S_CL_R, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, }, -/* PRIORITY, */ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, }, -/* RST_STREAM, */ { S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, }, -/* SETTINGS, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, }, -/* PUSH_PROMISE, */ { S_RS_L,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, }, -/* PING, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, }, -/* GOAWAY, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, }, -/* WINDOW_UPDATE,*/ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, }, -/* CONT */ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, }, +/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */ +{ S_ERR, S_ERR, S_ERR, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, },/* DATA */ +{ S_ERR, S_ERR, S_CL_R, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, },/* HEADERS */ +{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* PRIORITY */ +{ S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },/* RST_STREAM */ +{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* SETTINGS */ +{ S_RS_L,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PUSH_PROMISE */ +{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PING */ +{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* GOAWAY */ +{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* WINDOW_UPDATE */ +{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* CONT */ }; +/* state transisitions when certain frame types are received */ static int trans_on_recv[][H2_SS_MAX] = { -/* S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */ -/* DATA, */ { S_ERR, S_ERR, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, }, -/* HEADERS, */ { S_OPEN,S_CL_L, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, }, -/* PRIORITY, */ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, }, -/* RST_STREAM, */ { S_ERR, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, }, -/* SETTINGS, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, }, -/* PUSH_PROMISE, */ { S_RS_R,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, }, -/* PING, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, }, -/* GOAWAY, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, }, -/* WINDOW_UPDATE,*/ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, }, -/* CONT */ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, }, +/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */ +{ S_ERR, S_ERR, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },/* DATA */ +{ S_OPEN,S_CL_L, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },/* HEADERS */ +{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* PRIORITY */ +{ S_ERR, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },/* RST_STREAM */ +{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* SETTINGS */ +{ S_RS_R,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PUSH_PROMISE */ +{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PING */ +{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* GOAWAY */ +{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* WINDOW_UPDATE */ +{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* CONT */ }; +/* state transisitions when certain events happen */ static int trans_on_event[][H2_SS_MAX] = { -/* H2_SEV_CLOSED_L*/{ S_XXX, S_ERR, S_ERR, S_CL_L, S_CLS, S_XXX, S_XXX, S_XXX, }, -/* H2_SEV_CLOSED_R*/{ S_ERR, S_ERR, S_ERR, S_CL_R, S_ERR, S_CLS, S_NOP, S_NOP, }, -/* H2_SEV_CANCELLED*/{S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, }, -/* H2_SEV_EOS_SENT*/{ S_NOP, S_XXX, S_XXX, S_XXX, S_XXX, S_CLS, S_CLN, S_XXX, }, +/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */ +{ S_XXX, S_ERR, S_ERR, S_CL_L, S_CLS, S_XXX, S_XXX, S_XXX, },/* EV_CLOSED_L*/ +{ S_ERR, S_ERR, S_ERR, S_CL_R, S_ERR, S_CLS, S_NOP, S_NOP, },/* EV_CLOSED_R*/ +{ S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },/* EV_CANCELLED*/ +{ S_NOP, S_XXX, S_XXX, S_XXX, S_XXX, S_CLS, S_CLN, S_XXX, },/* EV_EOS_SENT*/ }; static int on_map(h2_stream_state_t state, int map[H2_SS_MAX]) @@ -137,7 +142,7 @@ static int on_frame(h2_stream_state_t st ap_assert(frame_type >= 0); ap_assert(state >= 0); if (frame_type >= maxlen) { - return state; /* NOP */ + return state; /* NOP, ignore unknown frame types */ } return on_map(state, frame_map[frame_type]); } @@ -152,9 +157,15 @@ static int on_frame_recv(h2_stream_state return on_frame(state, frame_type, trans_on_recv, H2_ALEN(trans_on_recv)); } -static int on_event(h2_stream_state_t state, h2_stream_event_t ev) +static int on_event(h2_stream* stream, h2_stream_event_t ev) { - return on_map(state, trans_on_event[ev]); + if (stream->monitor && stream->monitor->on_event) { + stream->monitor->on_event(stream->monitor->ctx, stream, ev); + } + if (ev < H2_ALEN(trans_on_event)) { + return on_map(stream->state, trans_on_event[ev]); + } + return stream->state; } static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, const char *tag) @@ -171,11 +182,16 @@ static void H2_STREAM_OUT_LOG(int lvl, h } static apr_status_t setup_input(h2_stream *stream) { - if (stream->input == NULL && !stream->input_eof) { - h2_beam_create(&stream->input, stream->pool, stream->id, - "input", H2_BEAM_OWNER_SEND, 0, - stream->session->s->timeout); - h2_beam_send_from(stream->input, stream->pool); + if (stream->input == NULL) { + int empty = (stream->input_eof + && (!stream->in_buffer + || APR_BRIGADE_EMPTY(stream->in_buffer))); + if (!empty) { + h2_beam_create(&stream->input, stream->pool, stream->id, + "input", H2_BEAM_OWNER_SEND, 0, + stream->session->s->timeout); + h2_beam_send_from(stream->input, stream->pool); + } } return APR_SUCCESS; } @@ -197,27 +213,27 @@ static apr_status_t close_input(h2_strea } if (stream->trailers && !apr_is_empty_table(stream->trailers)) { - apr_bucket_brigade *tmp; apr_bucket *b; h2_headers *r; - tmp = apr_brigade_create(stream->pool, c->bucket_alloc); + if (!stream->in_buffer) { + stream->in_buffer = apr_brigade_create(stream->pool, c->bucket_alloc); + } r = h2_headers_create(HTTP_OK, stream->trailers, NULL, stream->pool); stream->trailers = NULL; b = h2_bucket_headers_create(c->bucket_alloc, r); - APR_BRIGADE_INSERT_TAIL(tmp, b); + APR_BRIGADE_INSERT_TAIL(stream->in_buffer, b); b = apr_bucket_eos_create(c->bucket_alloc); - APR_BRIGADE_INSERT_TAIL(tmp, b); + APR_BRIGADE_INSERT_TAIL(stream->in_buffer, b); ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, H2_STRM_MSG(stream, "added trailers")); - setup_input(stream); - status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ); - apr_brigade_destroy(tmp); + h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING); } if (stream->input) { + h2_stream_flush_input(stream); return h2_beam_close(stream->input); } return status; @@ -225,7 +241,7 @@ static apr_status_t close_input(h2_strea static apr_status_t close_output(h2_stream *stream) { - if (h2_beam_is_closed(stream->output)) { + if (!stream->output || h2_beam_is_closed(stream->output)) { return APR_SUCCESS; } ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, @@ -324,7 +340,7 @@ void h2_stream_dispatch(h2_stream *strea ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, H2_STRM_MSG(stream, "dispatch event %d"), ev); - new_state = on_event(stream->state, ev); + new_state = on_event(stream, ev); if (new_state < 0) { ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c, H2_STRM_LOG(APLOGNO(10002), stream, "invalid event %d"), ev); @@ -335,7 +351,7 @@ void h2_stream_dispatch(h2_stream *strea else if (new_state == stream->state) { /* nop */ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, - H2_STRM_MSG(stream, "ignored event %d"), ev); + H2_STRM_MSG(stream, "non-state event %d"), ev); return; } else { @@ -394,7 +410,7 @@ apr_status_t h2_stream_send_frame(h2_str H2_STRM_MSG(stream, "send frame %d, eos=%d"), ftype, eos); status = transit(stream, new_state); if (status == APR_SUCCESS && eos) { - status = transit(stream, on_event(stream->state, H2_SEV_CLOSED_L)); + status = transit(stream, on_event(stream, H2_SEV_CLOSED_L)); } return status; } @@ -444,7 +460,23 @@ apr_status_t h2_stream_recv_frame(h2_str } status = transit(stream, new_state); if (status == APR_SUCCESS && eos) { - status = transit(stream, on_event(stream->state, H2_SEV_CLOSED_R)); + status = transit(stream, on_event(stream, H2_SEV_CLOSED_R)); + } + return status; +} + +apr_status_t h2_stream_flush_input(h2_stream *stream) +{ + apr_status_t status = APR_SUCCESS; + + if (stream->in_buffer && !APR_BRIGADE_EMPTY(stream->in_buffer)) { + setup_input(stream); + status = h2_beam_send(stream->input, stream->in_buffer, APR_BLOCK_READ); + stream->in_last_write = apr_time_now(); + } + if (stream->input_eof + && stream->input && !h2_beam_is_closed(stream->input)) { + status = h2_beam_close(stream->input); } return status; } @@ -454,21 +486,27 @@ apr_status_t h2_stream_recv_DATA(h2_stre { h2_session *session = stream->session; apr_status_t status = APR_SUCCESS; - apr_bucket_brigade *tmp; - ap_assert(stream); + stream->in_data_frames++; if (len > 0) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, - H2_STRM_MSG(stream, "recv DATA, len=%d"), (int)len); - - tmp = apr_brigade_create(stream->pool, session->c->bucket_alloc); - apr_brigade_write(tmp, NULL, NULL, (const char *)data, len); - setup_input(stream); - status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ); - apr_brigade_destroy(tmp); + if (APLOGctrace3(session->c)) { + const char *load = apr_pstrndup(stream->pool, (const char *)data, len); + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, session->c, + H2_STRM_MSG(stream, "recv DATA, len=%d: -->%s<--"), + (int)len, load); + } + else { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, + H2_STRM_MSG(stream, "recv DATA, len=%d"), (int)len); + } + stream->in_data_octets += len; + if (!stream->in_buffer) { + stream->in_buffer = apr_brigade_create(stream->pool, + session->c->bucket_alloc); + } + apr_brigade_write(stream->in_buffer, NULL, NULL, (const char *)data, len); + h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING); } - stream->in_data_frames++; - stream->in_data_octets += len; return status; } @@ -493,9 +531,12 @@ h2_stream *h2_stream_create(int id, apr_ stream->monitor = monitor; stream->max_mem = session->max_stream_mem; - h2_beam_create(&stream->output, pool, id, "output", H2_BEAM_OWNER_RECV, 0, - session->s->timeout); - +#ifdef H2_NG2_LOCAL_WIN_SIZE + stream->in_window_size = + nghttp2_session_get_stream_local_window_size( + stream->session->ngh2, stream->id); +#endif + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, H2_STRM_LOG(APLOGNO(03082), stream, "created")); on_state_enter(stream); @@ -563,7 +604,9 @@ void h2_stream_rst(h2_stream *stream, in if (stream->input) { h2_beam_abort(stream->input); } - h2_beam_leave(stream->output); + if (stream->output) { + h2_beam_leave(stream->output); + } ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, H2_STRM_MSG(stream, "reset, error=%d"), error_code); h2_stream_dispatch(stream, H2_SEV_CANCELLED); @@ -733,6 +776,8 @@ apr_status_t h2_stream_out_prepare(h2_st *presponse = NULL; } + ap_assert(stream); + if (stream->rst_error) { *plen = 0; *peos = 1; @@ -741,7 +786,7 @@ apr_status_t h2_stream_out_prepare(h2_st c = stream->session->c; prep_output(stream); - + /* determine how much we'd like to send. We cannot send more than * is requested. But we can reduce the size in case the master * connection operates in smaller chunks. (TSL warmup) */ @@ -753,8 +798,15 @@ apr_status_t h2_stream_out_prepare(h2_st h2_util_bb_avail(stream->out_buffer, plen, peos); if (!*peos && *plen < requested && *plen < stream->max_mem) { H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre"); - status = h2_beam_receive(stream->output, stream->out_buffer, - APR_NONBLOCK_READ, stream->max_mem - *plen); + if (stream->output) { + status = h2_beam_receive(stream->output, stream->out_buffer, + APR_NONBLOCK_READ, + stream->max_mem - *plen); + } + else { + status = APR_EOF; + } + if (APR_STATUS_IS_EOF(status)) { apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc); APR_BRIGADE_INSERT_TAIL(stream->out_buffer, eos); @@ -925,4 +977,64 @@ int h2_stream_was_closed(const h2_stream } } +apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount) +{ + h2_session *session = stream->session; + + if (amount > 0) { + apr_off_t consumed = amount; + + while (consumed > 0) { + int len = (consumed > INT_MAX)? INT_MAX : consumed; + nghttp2_session_consume(session->ngh2, stream->id, len); + consumed -= len; + } + +#ifdef H2_NG2_LOCAL_WIN_SIZE + if (1) { + int cur_size = nghttp2_session_get_stream_local_window_size( + session->ngh2, stream->id); + int win = stream->in_window_size; + int thigh = win * 8/10; + int tlow = win * 2/10; + const int win_max = 2*1024*1024; + const int win_min = 32*1024; + + /* Work in progress, probably should add directives for these + * values once this stabilizes somewhat. The general idea is + * to adapt stream window sizes if the input window changes + * a) very quickly (< good RTT) from full to empty + * b) only a little bit (> bad RTT) + * where in a) it grows and in b) it shrinks again. + */ + if (cur_size > thigh && amount > thigh && win < win_max) { + /* almost empty again with one reported consumption, how + * long did this take? */ + long ms = apr_time_msec(apr_time_now() - stream->in_last_write); + if (ms < 40) { + win = H2MIN(win_max, win + (64*1024)); + } + } + else if (cur_size < tlow && amount < tlow && win > win_min) { + /* staying full, for how long already? */ + long ms = apr_time_msec(apr_time_now() - stream->in_last_write); + if (ms > 700) { + win = H2MAX(win_min, win - (32*1024)); + } + } + + if (win != stream->in_window_size) { + stream->in_window_size = win; + nghttp2_session_set_local_window_size(session->ngh2, + NGHTTP2_FLAG_NONE, stream->id, win); + } + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, + "h2_stream(%ld-%d): consumed %ld bytes, window now %d/%d", + session->id, stream->id, (long)amount, + cur_size, stream->in_window_size); + } +#endif + } + return APR_SUCCESS; +} Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_stream.h URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_stream.h?rev=1789739&r1=1789738&r2=1789739&view=diff ============================================================================== --- httpd/httpd/branches/2.4.x/modules/http2/h2_stream.h (original) +++ httpd/httpd/branches/2.4.x/modules/http2/h2_stream.h Fri Mar 31 19:41:01 2017 @@ -25,10 +25,12 @@ * connection to the client. The h2_session writes to the h2_stream, * adding HEADERS and DATA and finally an EOS. When headers are done, * h2_stream is scheduled for handling, which is expected to produce - * a h2_headers. + * a response h2_headers at least. * - * The h2_headers gives the HEADER frames to sent to the client, followed - * by DATA frames read from the h2_stream until EOS is reached. + * The h2_headers may be followed by more h2_headers (interim responses) and + * by DATA frames read from the h2_stream until EOS is reached. Trailers + * are send when a last h2_headers is received. This always closes the stream + * output. */ struct h2_mplx; @@ -45,6 +47,9 @@ typedef void h2_stream_state_cb(void *ct typedef void h2_stream_event_cb(void *ctx, h2_stream *stream, h2_stream_event_t ev); +/** + * Callback structure for events and stream state transisitions + */ typedef struct h2_stream_monitor { void *ctx; h2_stream_state_cb *on_state_enter; /* called when a state is entered */ @@ -52,6 +57,8 @@ typedef struct h2_stream_monitor { was detected */ h2_stream_event_cb *on_state_event; /* called right before the given event result in a new stream state */ + h2_stream_event_cb *on_event; /* called for events that do not + trigger a state change */ } h2_stream_monitor; struct h2_stream { @@ -69,9 +76,13 @@ struct h2_stream { int request_headers_added; /* number of request headers added */ struct h2_bucket_beam *input; + apr_bucket_brigade *in_buffer; + int in_window_size; + apr_time_t in_last_write; + struct h2_bucket_beam *output; - apr_size_t max_mem; /* maximum amount of data buffered */ apr_bucket_brigade *out_buffer; + apr_size_t max_mem; /* maximum amount of data buffered */ int rst_error; /* stream error for RST_STREAM */ unsigned int aborted : 1; /* was aborted */ @@ -99,6 +110,10 @@ struct h2_stream { * @param id the stream identifier * @param pool the memory pool to use for this stream * @param session the session this stream belongs to + * @param monitor an optional monitor to be called for events and + * state transisitions + * @param initiated_on the id of the stream this one was initiated on (PUSH) + * * @return the newly opened stream */ h2_stream *h2_stream_create(int id, apr_pool_t *pool, @@ -111,6 +126,13 @@ h2_stream *h2_stream_create(int id, apr_ */ void h2_stream_destroy(h2_stream *stream); +/** + * Prepare the stream so that processing may start. + * + * This is the time to allocated resources not needed before. + * + * @param stream the stream to prep + */ apr_status_t h2_stream_prep_processing(h2_stream *stream); /* @@ -143,6 +165,12 @@ void h2_stream_cleanup(h2_stream *stream apr_pool_t *h2_stream_detach_pool(h2_stream *stream); /** + * Notify the stream that amount bytes have been consumed of its input + * since the last invocation of this method (delta amount). + */ +apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount); + +/** * Set complete stream headers from given h2_request. * * @param stream stream to write request to @@ -189,6 +217,8 @@ apr_status_t h2_stream_recv_frame(h2_str apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags, const uint8_t *data, size_t len); +apr_status_t h2_stream_flush_input(h2_stream *stream); + /** * Reset the stream. Stream write/reads will return errors afterwards. * @@ -275,7 +305,6 @@ const char *h2_stream_state_str(h2_strea */ int h2_stream_is_ready(h2_stream *stream); - #define H2_STRM_MSG(s, msg) \ "h2_stream(%ld-%d,%s): "msg, s->session->id, s->id, h2_stream_state_str(s) Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_task.c URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_task.c?rev=1789739&r1=1789738&r2=1789739&view=diff ============================================================================== --- httpd/httpd/branches/2.4.x/modules/http2/h2_task.c (original) +++ httpd/httpd/branches/2.4.x/modules/http2/h2_task.c Fri Mar 31 19:41:01 2017 @@ -45,7 +45,6 @@ #include "h2_session.h" #include "h2_stream.h" #include "h2_task.h" -#include "h2_worker.h" #include "h2_util.h" static void H2_TASK_OUT_LOG(int lvl, h2_task *task, apr_bucket_brigade *bb, @@ -217,14 +216,18 @@ static apr_status_t h2_filter_slave_in(a apr_status_t status = APR_SUCCESS; apr_bucket *b, *next; apr_off_t bblen; - apr_size_t rmax; + const int trace1 = APLOGctrace1(f->c); + apr_size_t rmax = ((readbytes <= APR_SIZE_MAX)? + (apr_size_t)readbytes : APR_SIZE_MAX); task = h2_ctx_cget_task(f->c); ap_assert(task); - rmax = ((readbytes <= APR_SIZE_MAX)? (apr_size_t)readbytes : APR_SIZE_MAX); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_slave_in(%s): read, mode=%d, block=%d, readbytes=%ld", - task->id, mode, block, (long)readbytes); + + if (trace1) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, + "h2_slave_in(%s): read, mode=%d, block=%d, readbytes=%ld", + task->id, mode, block, (long)readbytes); + } if (mode == AP_MODE_INIT) { return ap_get_brigade(f->c->input_filters, bb, mode, block, readbytes); @@ -250,19 +253,23 @@ static apr_status_t h2_filter_slave_in(a while (APR_BRIGADE_EMPTY(task->input.bb)) { /* Get more input data for our request. */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, - "h2_slave_in(%s): get more data from mplx, block=%d, " - "readbytes=%ld", task->id, block, (long)readbytes); + if (trace1) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, + "h2_slave_in(%s): get more data from mplx, block=%d, " + "readbytes=%ld", task->id, block, (long)readbytes); + } if (task->input.beam) { status = h2_beam_receive(task->input.beam, task->input.bb, block, - H2MIN(readbytes, 32*1024)); + 128*1024); } else { status = APR_EOF; } - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, f->c, - "h2_slave_in(%s): read returned", task->id); + if (trace1) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, f->c, + "h2_slave_in(%s): read returned", task->id); + } if (APR_STATUS_IS_EAGAIN(status) && (mode == AP_MODE_GETLINE || block == APR_BLOCK_READ)) { /* chunked input handling does not seem to like it if we @@ -276,9 +283,11 @@ static apr_status_t h2_filter_slave_in(a else if (status != APR_SUCCESS) { return status; } - - h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, - "input.beam recv raw", task->input.bb); + + if (trace1) { + h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, + "input.beam recv raw", task->input.bb); + } if (h2_task_logio_add_bytes_in) { apr_brigade_length(bb, 0, &bblen); h2_task_logio_add_bytes_in(f->c, bblen); @@ -292,12 +301,16 @@ static apr_status_t h2_filter_slave_in(a return (mode == AP_MODE_SPECULATIVE)? APR_EAGAIN : APR_EOF; } - h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, - "task_input.bb", task->input.bb); + if (trace1) { + h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, + "task_input.bb", task->input.bb); + } if (APR_BRIGADE_EMPTY(task->input.bb)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_slave_in(%s): no data", task->id); + if (trace1) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, + "h2_slave_in(%s): no data", task->id); + } return (block == APR_NONBLOCK_READ)? APR_EAGAIN : APR_EOF; } @@ -322,9 +335,11 @@ static apr_status_t h2_filter_slave_in(a apr_size_t len = sizeof(buffer)-1; apr_brigade_flatten(bb, buffer, &len); buffer[len] = 0; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, - "h2_slave_in(%s): getline: %s", - task->id, buffer); + if (trace1) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, + "h2_slave_in(%s): getline: %s", + task->id, buffer); + } } } else { @@ -337,7 +352,7 @@ static apr_status_t h2_filter_slave_in(a status = APR_ENOTIMPL; } - if (APLOGctrace1(f->c)) { + if (trace1) { apr_brigade_length(bb, 0, &bblen); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, "h2_slave_in(%s): %ld data bytes", task->id, (long)bblen); @@ -481,42 +496,44 @@ static int h2_task_pre_conn(conn_rec* c, return OK; } -h2_task *h2_task_create(h2_stream *stream, conn_rec *slave) +h2_task *h2_task_create(conn_rec *slave, int stream_id, + const h2_request *req, h2_mplx *m, + h2_bucket_beam *input, + apr_interval_time_t timeout, + apr_size_t output_max_mem) { apr_pool_t *pool; h2_task *task; ap_assert(slave); - ap_assert(stream); - ap_assert(stream->request); + ap_assert(req); apr_pool_create(&pool, slave->pool); task = apr_pcalloc(pool, sizeof(h2_task)); if (task == NULL) { - ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, slave, - H2_STRM_LOG(APLOGNO(02941), stream, "create task")); return NULL; } - task->id = apr_psprintf(pool, "%ld-%d", - stream->session->id, stream->id); - task->stream_id = stream->id; + task->id = "000"; + task->stream_id = stream_id; task->c = slave; - task->mplx = stream->session->mplx; - task->c->keepalives = slave->master->keepalives; + task->mplx = m; task->pool = pool; - task->request = stream->request; - task->input.beam = stream->input; - task->output.beam = stream->output; - task->timeout = stream->session->s->timeout; - - h2_beam_send_from(stream->output, task->pool); - h2_ctx_create_for(slave, task); - + task->request = req; + task->timeout = timeout; + task->input.beam = input; + task->output.max_buffer = output_max_mem; + return task; } void h2_task_destroy(h2_task *task) { + if (task->output.beam) { + h2_beam_log(task->output.beam, task->c, APLOG_TRACE2, "task_destroy"); + h2_beam_destroy(task->output.beam); + task->output.beam = NULL; + } + if (task->eor) { apr_bucket_destroy(task->eor); } @@ -527,9 +544,14 @@ void h2_task_destroy(h2_task *task) apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread, int worker_id) { + conn_rec *c; + ap_assert(task); - - if (task->c->master) { + c = task->c; + task->worker_started = 1; + task->started_at = apr_time_now(); + + if (c->master) { /* Each conn_rec->id is supposed to be unique at a point in time. Since * some modules (and maybe external code) uses this id as an identifier * for the request_rec they handle, it needs to be unique for slave @@ -547,6 +569,8 @@ apr_status_t h2_task_do(h2_task *task, a */ int slave_id, free_bits; + task->id = apr_psprintf(task->pool, "%ld-%d", c->master->id, + task->stream_id); if (sizeof(unsigned long) >= 8) { free_bits = 32; slave_id = task->stream_id; @@ -558,12 +582,31 @@ apr_status_t h2_task_do(h2_task *task, a free_bits = 8; slave_id = worker_id; } - task->c->id = (task->c->master->id << free_bits)^slave_id; + task->c->id = (c->master->id << free_bits)^slave_id; + c->keepalive = AP_CONN_KEEPALIVE; + } + + h2_beam_create(&task->output.beam, c->pool, task->stream_id, "output", + H2_BEAM_OWNER_SEND, 0, task->timeout); + if (!task->output.beam) { + return APR_ENOMEM; + } + + h2_beam_buffer_size_set(task->output.beam, task->output.max_buffer); + h2_beam_send_from(task->output.beam, task->pool); + + h2_ctx_create_for(c, task); + apr_table_setn(c->notes, H2_TASK_ID_NOTE, task->id); + + if (task->input.beam) { + h2_beam_mutex_enable(task->input.beam); } - task->input.bb = apr_brigade_create(task->pool, task->c->bucket_alloc); + h2_slave_run_pre_connection(c, ap_get_conn_socket(c)); + + task->input.bb = apr_brigade_create(task->pool, c->bucket_alloc); if (task->request->serialize) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_task(%s): serialize request %s %s", task->id, task->request->method, task->request->path); apr_brigade_printf(task->input.bb, NULL, @@ -573,20 +616,21 @@ apr_status_t h2_task_do(h2_task *task, a apr_brigade_puts(task->input.bb, NULL, NULL, "\r\n"); } - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_task(%s): process connection", task->id); + task->c->current_thread = thread; - ap_run_process_connection(task->c); + ap_run_process_connection(c); if (task->frozen) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_task(%s): process_conn returned frozen task", task->id); /* cleanup delayed */ return APR_EAGAIN; } else { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_task(%s): processing done", task->id); return output_finish(task); } Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_task.h URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_task.h?rev=1789739&r1=1789738&r2=1789739&view=diff ============================================================================== --- httpd/httpd/branches/2.4.x/modules/http2/h2_task.h (original) +++ httpd/httpd/branches/2.4.x/modules/http2/h2_task.h Fri Mar 31 19:41:01 2017 @@ -73,6 +73,7 @@ struct h2_task { unsigned int copy_files : 1; struct h2_response_parser *rparser; apr_bucket_brigade *bb; + apr_size_t max_buffer; } output; struct h2_mplx *mplx; @@ -91,7 +92,11 @@ struct h2_task { struct h2_req_engine *assigned; /* engine that task has been assigned to */ }; -h2_task *h2_task_create(struct h2_stream *stream, conn_rec *slave); +h2_task *h2_task_create(conn_rec *slave, int stream_id, + const h2_request *req, struct h2_mplx *m, + struct h2_bucket_beam *input, + apr_interval_time_t timeout, + apr_size_t output_max_mem); void h2_task_destroy(h2_task *task); Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_util.c URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_util.c?rev=1789739&r1=1789738&r2=1789739&view=diff ============================================================================== --- httpd/httpd/branches/2.4.x/modules/http2/h2_util.c (original) +++ httpd/httpd/branches/2.4.x/modules/http2/h2_util.c Fri Mar 31 19:41:01 2017 @@ -15,6 +15,8 @@ #include #include +#include +#include #include #include @@ -604,6 +606,294 @@ int h2_iq_contains(h2_iqueue *q, int sid } /******************************************************************************* + * FIFO queue + ******************************************************************************/ + +struct h2_fifo { + void **elems; + int nelems; + int head; + int count; + int aborted; + apr_thread_mutex_t *lock; + apr_thread_cond_t *not_empty; + apr_thread_cond_t *not_full; +}; + +static int nth_index(h2_fifo *fifo, int n) +{ + return (fifo->head + n) % fifo->nelems; +} + +static apr_status_t fifo_destroy(void *data) +{ + h2_fifo *fifo = data; + + apr_thread_cond_destroy(fifo->not_empty); + apr_thread_cond_destroy(fifo->not_full); + apr_thread_mutex_destroy(fifo->lock); + + return APR_SUCCESS; +} + +apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity) +{ + apr_status_t rv; + h2_fifo *fifo; + + fifo = apr_pcalloc(pool, sizeof(*fifo)); + if (fifo == NULL) { + return APR_ENOMEM; + } + + rv = apr_thread_mutex_create(&fifo->lock, + APR_THREAD_MUTEX_UNNESTED, pool); + if (rv != APR_SUCCESS) { + return rv; + } + + rv = apr_thread_cond_create(&fifo->not_empty, pool); + if (rv != APR_SUCCESS) { + return rv; + } + + rv = apr_thread_cond_create(&fifo->not_full, pool); + if (rv != APR_SUCCESS) { + return rv; + } + + fifo->elems = apr_pcalloc(pool, capacity * sizeof(void*)); + if (fifo->elems == NULL) { + return APR_ENOMEM; + } + fifo->nelems = capacity; + + *pfifo = fifo; + apr_pool_cleanup_register(pool, fifo, fifo_destroy, apr_pool_cleanup_null); + + return APR_SUCCESS; +} + +apr_status_t h2_fifo_term(h2_fifo *fifo) +{ + apr_status_t rv; + if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) { + fifo->aborted = 1; + apr_thread_mutex_unlock(fifo->lock); + } + return rv; +} + +apr_status_t h2_fifo_interrupt(h2_fifo *fifo) +{ + apr_status_t rv; + if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) { + apr_thread_cond_broadcast(fifo->not_empty); + apr_thread_cond_broadcast(fifo->not_full); + apr_thread_mutex_unlock(fifo->lock); + } + return rv; +} + +int h2_fifo_count(h2_fifo *fifo) +{ + return fifo->count; +} + +static apr_status_t check_not_empty(h2_fifo *fifo, int block) +{ + if (fifo->count == 0) { + if (!block) { + return APR_EAGAIN; + } + while (fifo->count == 0) { + if (fifo->aborted) { + return APR_EOF; + } + apr_thread_cond_wait(fifo->not_empty, fifo->lock); + } + } + return APR_SUCCESS; +} + +static apr_status_t fifo_push(h2_fifo *fifo, void *elem, int block) +{ + apr_status_t rv; + + if (fifo->aborted) { + return APR_EOF; + } + + if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) { + if (fifo->count == fifo->nelems) { + if (block) { + while (fifo->count == fifo->nelems) { + if (fifo->aborted) { + apr_thread_mutex_unlock(fifo->lock); + return APR_EOF; + } + apr_thread_cond_wait(fifo->not_full, fifo->lock); + } + } + else { + apr_thread_mutex_unlock(fifo->lock); + return APR_EAGAIN; + } + } + + ap_assert(fifo->count < fifo->nelems); + fifo->elems[nth_index(fifo, fifo->count)] = elem; + ++fifo->count; + if (fifo->count == 1) { + apr_thread_cond_broadcast(fifo->not_empty); + } + apr_thread_mutex_unlock(fifo->lock); + } + return rv; +} + +apr_status_t h2_fifo_push(h2_fifo *fifo, void *elem) +{ + return fifo_push(fifo, elem, 1); +} + +apr_status_t h2_fifo_try_push(h2_fifo *fifo, void *elem) +{ + return fifo_push(fifo, elem, 0); +} + +static apr_status_t fifo_pull(h2_fifo *fifo, void **pelem, int block) +{ + apr_status_t rv; + + if (fifo->aborted) { + return APR_EOF; + } + + if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) { + if ((rv = check_not_empty(fifo, block)) != APR_SUCCESS) { + apr_thread_mutex_unlock(fifo->lock); + *pelem = NULL; + return rv; + } + + ap_assert(fifo->count > 0); + *pelem = fifo->elems[fifo->head]; + --fifo->count; + if (fifo->count > 0) { + fifo->head = nth_index(fifo, 1); + if (fifo->count+1 == fifo->nelems) { + apr_thread_cond_broadcast(fifo->not_full); + } + } + apr_thread_mutex_unlock(fifo->lock); + } + return rv; +} + +apr_status_t h2_fifo_pull(h2_fifo *fifo, void **pelem) +{ + return fifo_pull(fifo, pelem, 1); +} + +apr_status_t h2_fifo_try_pull(h2_fifo *fifo, void **pelem) +{ + return fifo_pull(fifo, pelem, 0); +} + +static apr_status_t fifo_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx, int block) +{ + apr_status_t rv; + void *elem; + + if (fifo->aborted) { + return APR_EOF; + } + + if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) { + if ((rv = check_not_empty(fifo, block)) != APR_SUCCESS) { + apr_thread_mutex_unlock(fifo->lock); + return rv; + } + + ap_assert(fifo->count > 0); + elem = fifo->elems[fifo->head]; + + switch (fn(elem, ctx)) { + case H2_FIFO_OP_PULL: + --fifo->count; + if (fifo->count > 0) { + fifo->head = nth_index(fifo, 1); + if (fifo->count+1 == fifo->nelems) { + apr_thread_cond_broadcast(fifo->not_full); + } + } + break; + case H2_FIFO_OP_REPUSH: + if (fifo->count > 1) { + fifo->head = nth_index(fifo, 1); + if (fifo->count < fifo->nelems) { + fifo->elems[nth_index(fifo, fifo->count-1)] = elem; + } + } + break; + } + + apr_thread_mutex_unlock(fifo->lock); + } + return rv; +} + +apr_status_t h2_fifo_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx) +{ + return fifo_peek(fifo, fn, ctx, 1); +} + +apr_status_t h2_fifo_try_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx) +{ + return fifo_peek(fifo, fn, ctx, 0); +} + +apr_status_t h2_fifo_remove(h2_fifo *fifo, void *elem) +{ + apr_status_t rv; + + if (fifo->aborted) { + return APR_EOF; + } + + if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) { + int i, rc; + void *e; + + rc = 0; + for (i = 0; i < fifo->count; ++i) { + e = fifo->elems[nth_index(fifo, i)]; + if (e == elem) { + ++rc; + } + else if (rc) { + fifo->elems[nth_index(fifo, i-rc)] = e; + } + } + if (rc) { + fifo->count -= rc; + if (fifo->count + rc == fifo->nelems) { + apr_thread_cond_broadcast(fifo->not_full); + } + rv = APR_SUCCESS; + } + else { + rv = APR_EAGAIN; + } + + apr_thread_mutex_unlock(fifo->lock); + } + return rv; +} + + +/******************************************************************************* * h2_util for apt_table_t ******************************************************************************/ @@ -701,17 +991,16 @@ apr_status_t h2_brigade_concat_length(ap apr_bucket_brigade *src, apr_off_t length) { - apr_bucket *b, *next; + apr_bucket *b; apr_off_t remain = length; apr_status_t status = APR_SUCCESS; - for (b = APR_BRIGADE_FIRST(src); - b != APR_BRIGADE_SENTINEL(src); - b = next) { - next = APR_BUCKET_NEXT(b); + while (!APR_BRIGADE_EMPTY(src)) { + b = APR_BRIGADE_FIRST(src); if (APR_BUCKET_IS_METADATA(b)) { - /* fall through */ + APR_BUCKET_REMOVE(b); + APR_BRIGADE_INSERT_TAIL(dest, b); } else { if (remain == b->length) { @@ -734,10 +1023,10 @@ apr_status_t h2_brigade_concat_length(ap apr_bucket_split(b, remain); } } + APR_BUCKET_REMOVE(b); + APR_BRIGADE_INSERT_TAIL(dest, b); + remain -= b->length; } - APR_BUCKET_REMOVE(b); - APR_BRIGADE_INSERT_TAIL(dest, b); - remain -= b->length; } return status; } @@ -925,55 +1214,14 @@ apr_size_t h2_util_bucket_print(char *bu if (bmax <= off) { return off; } - if (APR_BUCKET_IS_METADATA(b)) { - if (APR_BUCKET_IS_EOS(b)) { - off += apr_snprintf(buffer+off, bmax-off, "eos"); - } - else if (APR_BUCKET_IS_FLUSH(b)) { - off += apr_snprintf(buffer+off, bmax-off, "flush"); - } - else if (AP_BUCKET_IS_EOR(b)) { - off += apr_snprintf(buffer+off, bmax-off, "eor"); - } - else { - off += apr_snprintf(buffer+off, bmax-off, "%s", b->type->name); - } + else if (APR_BUCKET_IS_METADATA(b)) { + off += apr_snprintf(buffer+off, bmax-off, "%s", b->type->name); } - else { - const char *btype = b->type->name; - if (APR_BUCKET_IS_FILE(b)) { - btype = "file"; - } - else if (APR_BUCKET_IS_PIPE(b)) { - btype = "pipe"; - } - else if (APR_BUCKET_IS_SOCKET(b)) { - btype = "socket"; - } - else if (APR_BUCKET_IS_HEAP(b)) { - btype = "heap"; - } - else if (APR_BUCKET_IS_TRANSIENT(b)) { - btype = "transient"; - } - else if (APR_BUCKET_IS_IMMORTAL(b)) { - btype = "immortal"; - } -#if APR_HAS_MMAP - else if (APR_BUCKET_IS_MMAP(b)) { - btype = "mmap"; - } -#endif - else if (APR_BUCKET_IS_POOL(b)) { - btype = "pool"; - } - - if (bmax > off) { - off += apr_snprintf(buffer+off, bmax-off, "%s[%ld]", - btype, - (long)(b->length == ((apr_size_t)-1)? - -1 : b->length)); - } + else if (bmax > off) { + off += apr_snprintf(buffer+off, bmax-off, "%s[%ld]", + b->type->name, + (long)(b->length == ((apr_size_t)-1)? + -1 : b->length)); } return off; } Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_util.h URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_util.h?rev=1789739&r1=1789738&r2=1789739&view=diff ============================================================================== --- httpd/httpd/branches/2.4.x/modules/http2/h2_util.h (original) +++ httpd/httpd/branches/2.4.x/modules/http2/h2_util.h Fri Mar 31 19:41:01 2017 @@ -184,6 +184,57 @@ size_t h2_iq_mshift(h2_iqueue *q, int *p int h2_iq_contains(h2_iqueue *q, int sid); /******************************************************************************* + * FIFO queue + ******************************************************************************/ + +/** + * A thread-safe FIFO queue with some extra bells and whistles, if you + * do not need anything special, better use 'apr_queue'. + */ +typedef struct h2_fifo h2_fifo; + +apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity); +apr_status_t h2_fifo_term(h2_fifo *fifo); +apr_status_t h2_fifo_interrupt(h2_fifo *fifo); + +int h2_fifo_count(h2_fifo *fifo); + +apr_status_t h2_fifo_push(h2_fifo *fifo, void *elem); +apr_status_t h2_fifo_try_push(h2_fifo *fifo, void *elem); + +apr_status_t h2_fifo_pull(h2_fifo *fifo, void **pelem); +apr_status_t h2_fifo_try_pull(h2_fifo *fifo, void **pelem); + +typedef enum { + H2_FIFO_OP_PULL, /* pull the element from the queue, ie discard it */ + H2_FIFO_OP_REPUSH, /* pull and immediatley re-push it */ +} h2_fifo_op_t; + +typedef h2_fifo_op_t h2_fifo_peek_fn(void *head, void *ctx); + +/** + * Call given function on the head of the queue, once it exists, and + * perform the returned operation on it. The queue will hold its lock during + * this time, so no other operations on the queue are possible. + * @param fifo the queue to peek at + * @param fn the function to call on the head, once available + * @param ctx context to pass in call to function + */ +apr_status_t h2_fifo_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx); + +/** + * Non-blocking version of h2_fifo_peek. + */ +apr_status_t h2_fifo_try_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx); + +/** + * Remove the elem from the queue, will remove multiple appearances. + * @param elem the element to remove + * @return APR_SUCCESS iff > 0 elems were removed, APR_EAGAIN otherwise. + */ +apr_status_t h2_fifo_remove(h2_fifo *fifo, void *elem); + +/******************************************************************************* * common helpers ******************************************************************************/ /* h2_log2(n) iff n is a power of 2 */ @@ -379,8 +430,8 @@ do { \ const char *line = "(null)"; \ apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]); \ len = h2_util_bb_print(buffer, bmax, (tag), "", (bb)); \ - ap_log_cerror(APLOG_MARK, level, 0, (c), "bb_dump(%s): %s", \ - (c)->log_id, (len? buffer : line)); \ + ap_log_cerror(APLOG_MARK, level, 0, (c), "bb_dump(%ld): %s", \ + ((c)->master? (c)->master->id : (c)->id), (len? buffer : line)); \ } while(0) Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_version.h URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_version.h?rev=1789739&r1=1789738&r2=1789739&view=diff ============================================================================== --- httpd/httpd/branches/2.4.x/modules/http2/h2_version.h (original) +++ httpd/httpd/branches/2.4.x/modules/http2/h2_version.h Fri Mar 31 19:41:01 2017 @@ -26,7 +26,7 @@ * @macro * Version number of the http2 module as c string */ -#define MOD_HTTP2_VERSION "1.9.3" +#define MOD_HTTP2_VERSION "1.10.0" /** * @macro @@ -34,7 +34,7 @@ * release. This is a 24 bit number with 8 bits for major number, 8 bits * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203. */ -#define MOD_HTTP2_VERSION_NUM 0x010903 +#define MOD_HTTP2_VERSION_NUM 0x010a00 #endif /* mod_h2_h2_version_h */ Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_workers.c URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_workers.c?rev=1789739&r1=1789738&r2=1789739&view=diff ============================================================================== --- httpd/httpd/branches/2.4.x/modules/http2/h2_workers.c (original) +++ httpd/httpd/branches/2.4.x/modules/http2/h2_workers.c Fri Mar 31 19:41:01 2017 @@ -27,221 +27,248 @@ #include "h2_private.h" #include "h2_mplx.h" #include "h2_task.h" -#include "h2_worker.h" #include "h2_workers.h" +#include "h2_util.h" +typedef struct h2_slot h2_slot; +struct h2_slot { + int id; + h2_slot *next; + h2_workers *workers; + int aborted; + int sticks; + h2_task *task; + apr_thread_t *thread; + apr_thread_cond_t *not_idle; +}; -static int in_list(h2_workers *workers, h2_mplx *m) +static h2_slot *pop_slot(h2_slot **phead) { - h2_mplx *e; - for (e = H2_MPLX_LIST_FIRST(&workers->mplxs); - e != H2_MPLX_LIST_SENTINEL(&workers->mplxs); - e = H2_MPLX_NEXT(e)) { - if (e == m) { - return 1; + /* Atomically pop a slot from the list */ + for (;;) { + h2_slot *first = *phead; + if (first == NULL) { + return NULL; + } + if (apr_atomic_casptr((void*)phead, first->next, first) == first) { + first->next = NULL; + return first; } } - return 0; } -static void cleanup_zombies(h2_workers *workers, int lock) +static void push_slot(h2_slot **phead, h2_slot *slot) { - if (lock) { - apr_thread_mutex_lock(workers->lock); + /* Atomically push a slot to the list */ + ap_assert(!slot->next); + for (;;) { + h2_slot *next = slot->next = *phead; + if (apr_atomic_casptr((void*)phead, slot, next) == next) { + return; + } + } +} + +static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx); + +static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot) +{ + apr_status_t status; + + slot->workers = workers; + slot->aborted = 0; + slot->task = NULL; + if (!slot->not_idle) { + status = apr_thread_cond_create(&slot->not_idle, workers->pool); + if (status != APR_SUCCESS) { + push_slot(&workers->free, slot); + return status; + } } - while (!H2_WORKER_LIST_EMPTY(&workers->zombies)) { - h2_worker *zombie = H2_WORKER_LIST_FIRST(&workers->zombies); - H2_WORKER_REMOVE(zombie); - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, - "h2_workers: cleanup zombie %d", zombie->id); - h2_worker_destroy(zombie); + + /* thread will either immediately start work or add itself + * to the idle queue */ + apr_thread_create(&slot->thread, workers->thread_attr, slot_run, slot, + workers->pool); + if (!slot->thread) { + push_slot(&workers->free, slot); + return APR_ENOMEM; } - if (lock) { + + ++workers->worker_count; + return APR_SUCCESS; +} + +static apr_status_t add_worker(h2_workers *workers) +{ + h2_slot *slot = pop_slot(&workers->free); + if (slot) { + return activate_slot(workers, slot); + } + return APR_EAGAIN; +} + +static void wake_idle_worker(h2_workers *workers) +{ + h2_slot *slot = pop_slot(&workers->idle); + if (slot) { + apr_thread_mutex_lock(workers->lock); + apr_thread_cond_signal(slot->not_idle); apr_thread_mutex_unlock(workers->lock); } + else if (workers->dynamic) { + add_worker(workers); + } } -static h2_task *next_task(h2_workers *workers) +static void cleanup_zombies(h2_workers *workers) { - h2_task *task = NULL; - h2_mplx *last = NULL; - int has_more; - - /* Get the next h2_mplx to process that has a task to hand out. - * If it does, place it at the end of the queu and return the - * task to the worker. - * If it (currently) has no tasks, remove it so that it needs - * to register again for scheduling. - * If we run out of h2_mplx in the queue, we need to wait for - * new mplx to arrive. Depending on how many workers do exist, - * we do a timed wait or block indefinitely. - */ - while (!task && !H2_MPLX_LIST_EMPTY(&workers->mplxs)) { - h2_mplx *m = H2_MPLX_LIST_FIRST(&workers->mplxs); - - if (last == m) { - break; - } - H2_MPLX_REMOVE(m); - --workers->mplx_count; - - task = h2_mplx_pop_task(m, &has_more); - if (has_more) { - H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m); - ++workers->mplx_count; - if (!last) { - last = m; - } + h2_slot *slot; + while ((slot = pop_slot(&workers->zombies))) { + if (slot->thread) { + apr_status_t status; + apr_thread_join(&status, slot->thread); + slot->thread = NULL; } + --workers->worker_count; + push_slot(&workers->free, slot); } - return task; +} + +static apr_status_t slot_pull_task(h2_slot *slot, h2_mplx *m) +{ + int has_more; + slot->task = h2_mplx_pop_task(m, &has_more); + if (slot->task) { + /* Ok, we got something to give back to the worker for execution. + * If we still have idle workers, we let the worker be sticky, + * e.g. making it poll the task's h2_mplx instance for more work + * before asking back here. */ + slot->sticks = slot->workers->max_workers; + return has_more? APR_EAGAIN : APR_SUCCESS; + } + slot->sticks = 0; + return APR_EOF; +} + +static h2_fifo_op_t mplx_peek(void *head, void *ctx) +{ + h2_mplx *m = head; + h2_slot *slot = ctx; + + if (slot_pull_task(slot, m) == APR_EAGAIN) { + wake_idle_worker(slot->workers); + return H2_FIFO_OP_REPUSH; + } + return H2_FIFO_OP_PULL; } /** * Get the next task for the given worker. Will block until a task arrives * or the max_wait timer expires and more than min workers exist. */ -static apr_status_t get_mplx_next(h2_worker *worker, void *ctx, - h2_task **ptask, int *psticky) +static apr_status_t get_next(h2_slot *slot) { + h2_workers *workers = slot->workers; apr_status_t status; - apr_time_t wait_until = 0, now; - h2_workers *workers = ctx; - h2_task *task = NULL; - - *ptask = NULL; - *psticky = 0; - status = apr_thread_mutex_lock(workers->lock); - if (status == APR_SUCCESS) { - ++workers->idle_workers; - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, - "h2_worker(%d): looking for work", worker->id); - - while (!h2_worker_is_aborted(worker) && !workers->aborted - && !(task = next_task(workers))) { - - /* Need to wait for a new tasks to arrive. If we are above - * minimum workers, we do a timed wait. When timeout occurs - * and we have still more workers, we shut down one after - * the other. */ - cleanup_zombies(workers, 0); - if (workers->worker_count > workers->min_workers) { - now = apr_time_now(); - if (now >= wait_until) { - wait_until = now + apr_time_from_sec(workers->max_idle_secs); - } - - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, - "h2_worker(%d): waiting signal, " - "workers=%d, idle=%d", worker->id, - (int)workers->worker_count, - workers->idle_workers); - status = apr_thread_cond_timedwait(workers->mplx_added, - workers->lock, - wait_until - now); - if (status == APR_TIMEUP - && workers->worker_count > workers->min_workers) { - /* waited long enough without getting a task and - * we are above min workers, abort this one. */ - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, - workers->s, - "h2_workers: aborting idle worker"); - h2_worker_abort(worker); - break; - } - } - else { - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, - "h2_worker(%d): waiting signal (eternal), " - "worker_count=%d, idle=%d", worker->id, - (int)workers->worker_count, - workers->idle_workers); - apr_thread_cond_wait(workers->mplx_added, workers->lock); + slot->task = NULL; + while (!slot->aborted) { + if (!slot->task) { + status = h2_fifo_try_peek(workers->mplxs, mplx_peek, slot); + if (status == APR_EOF) { + return status; } } - /* Here, we either have gotten task or decided to shut down - * the calling worker. - */ - if (task) { - /* Ok, we got something to give back to the worker for execution. - * If we have more idle workers than h2_mplx in our queue, then - * we let the worker be sticky, e.g. making it poll the task's - * h2_mplx instance for more work before asking back here. - * This avoids entering our global lock as long as enough idle - * workers remain. Stickiness of a worker ends when the connection - * has no new tasks to process, so the worker will get back here - * eventually. - */ - *ptask = task; - *psticky = (workers->max_workers >= workers->mplx_count); - - if (workers->mplx_count && workers->idle_workers > 1) { - apr_thread_cond_signal(workers->mplx_added); - } + if (slot->task) { + return APR_SUCCESS; } + apr_thread_mutex_lock(workers->lock); + cleanup_zombies(workers); + + ++workers->idle_workers; + push_slot(&workers->idle, slot); + apr_thread_cond_wait(slot->not_idle, workers->lock); --workers->idle_workers; + apr_thread_mutex_unlock(workers->lock); } - - return *ptask? APR_SUCCESS : APR_EOF; + return APR_EOF; } -static void worker_done(h2_worker *worker, void *ctx) +static void slot_done(h2_slot *slot) { - h2_workers *workers = ctx; - apr_status_t status = apr_thread_mutex_lock(workers->lock); - if (status == APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, - "h2_worker(%d): done", worker->id); - H2_WORKER_REMOVE(worker); - --workers->worker_count; - H2_WORKER_LIST_INSERT_TAIL(&workers->zombies, worker); - - apr_thread_mutex_unlock(workers->lock); - } + push_slot(&(slot->workers->zombies), slot); } -static apr_status_t add_worker(h2_workers *workers) + +static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx) { - h2_worker *w = h2_worker_create(workers->next_worker_id++, - workers->pool, workers->thread_attr, - get_mplx_next, worker_done, workers); - if (!w) { - return APR_ENOMEM; + h2_slot *slot = wctx; + + while (!slot->aborted) { + + /* Get a h2_task from the mplxs queue. */ + get_next(slot); + while (slot->task) { + + h2_task_do(slot->task, thread, slot->id); + + /* Report the task as done. If stickyness is left, offer the + * mplx the opportunity to give us back a new task right away. + */ + if (!slot->aborted && (--slot->sticks > 0)) { + h2_mplx_task_done(slot->task->mplx, slot->task, &slot->task); + } + else { + h2_mplx_task_done(slot->task->mplx, slot->task, NULL); + slot->task = NULL; + } + } } - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, - "h2_workers: adding worker(%d)", w->id); - ++workers->worker_count; - H2_WORKER_LIST_INSERT_TAIL(&workers->workers, w); - return APR_SUCCESS; + + slot_done(slot); + return NULL; } -static apr_status_t h2_workers_start(h2_workers *workers) +static apr_status_t workers_pool_cleanup(void *data) { - apr_status_t status = apr_thread_mutex_lock(workers->lock); - if (status == APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, - "h2_workers: starting"); - - while (workers->worker_count < workers->min_workers - && status == APR_SUCCESS) { - status = add_worker(workers); + h2_workers *workers = data; + h2_slot *slot; + + if (!workers->aborted) { + apr_thread_mutex_lock(workers->lock); + workers->aborted = 1; + /* before we go, cleanup any zombies and abort the rest */ + cleanup_zombies(workers); + for (;;) { + slot = pop_slot(&workers->idle); + if (slot) { + slot->aborted = 1; + apr_thread_cond_signal(slot->not_idle); + } + else { + break; + } } apr_thread_mutex_unlock(workers->lock); + + h2_fifo_term(workers->mplxs); + h2_fifo_interrupt(workers->mplxs); } - return status; + return APR_SUCCESS; } h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool, int min_workers, int max_workers, - apr_size_t max_tx_handles) + int idle_secs) { apr_status_t status; h2_workers *workers; apr_pool_t *pool; + int i, n; ap_assert(s); ap_assert(server_pool); @@ -254,163 +281,77 @@ h2_workers *h2_workers_create(server_rec apr_pool_create(&pool, server_pool); apr_pool_tag(pool, "h2_workers"); workers = apr_pcalloc(pool, sizeof(h2_workers)); - if (workers) { - workers->s = s; - workers->pool = pool; - workers->min_workers = min_workers; - workers->max_workers = max_workers; - workers->max_idle_secs = 10; - - workers->max_tx_handles = max_tx_handles; - workers->spare_tx_handles = workers->max_tx_handles; - - apr_threadattr_create(&workers->thread_attr, workers->pool); - if (ap_thread_stacksize != 0) { - apr_threadattr_stacksize_set(workers->thread_attr, - ap_thread_stacksize); - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s, - "h2_workers: using stacksize=%ld", - (long)ap_thread_stacksize); - } - - APR_RING_INIT(&workers->workers, h2_worker, link); - APR_RING_INIT(&workers->zombies, h2_worker, link); - APR_RING_INIT(&workers->mplxs, h2_mplx, link); - - status = apr_thread_mutex_create(&workers->lock, - APR_THREAD_MUTEX_DEFAULT, - workers->pool); - if (status == APR_SUCCESS) { - status = apr_thread_cond_create(&workers->mplx_added, workers->pool); - } - - if (status == APR_SUCCESS) { - status = apr_thread_mutex_create(&workers->tx_lock, - APR_THREAD_MUTEX_DEFAULT, - workers->pool); - } - - if (status == APR_SUCCESS) { - status = h2_workers_start(workers); - } - - if (status != APR_SUCCESS) { - h2_workers_destroy(workers); - workers = NULL; - } + if (!workers) { + return NULL; } - return workers; -} - -void h2_workers_destroy(h2_workers *workers) -{ - /* before we go, cleanup any zombie workers that may have accumulated */ - cleanup_zombies(workers, 1); - if (workers->mplx_added) { - apr_thread_cond_destroy(workers->mplx_added); - workers->mplx_added = NULL; - } - if (workers->lock) { - apr_thread_mutex_destroy(workers->lock); - workers->lock = NULL; + workers->s = s; + workers->pool = pool; + workers->min_workers = min_workers; + workers->max_workers = max_workers; + workers->max_idle_secs = (idle_secs > 0)? idle_secs : 10; + + status = h2_fifo_create(&workers->mplxs, pool, 2 * workers->max_workers); + if (status != APR_SUCCESS) { + return NULL; } - while (!H2_MPLX_LIST_EMPTY(&workers->mplxs)) { - h2_mplx *m = H2_MPLX_LIST_FIRST(&workers->mplxs); - H2_MPLX_REMOVE(m); - } - while (!H2_WORKER_LIST_EMPTY(&workers->workers)) { - h2_worker *w = H2_WORKER_LIST_FIRST(&workers->workers); - H2_WORKER_REMOVE(w); + + status = apr_threadattr_create(&workers->thread_attr, workers->pool); + if (status != APR_SUCCESS) { + return NULL; } - if (workers->pool) { - apr_pool_destroy(workers->pool); - /* workers is gone */ + + if (ap_thread_stacksize != 0) { + apr_threadattr_stacksize_set(workers->thread_attr, + ap_thread_stacksize); + ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s, + "h2_workers: using stacksize=%ld", + (long)ap_thread_stacksize); } -} - -apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m) -{ - apr_status_t status = apr_thread_mutex_lock(workers->lock); - if (status == APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_TRACE3, status, workers->s, - "h2_workers: register mplx(%ld), idle=%d", - m->id, workers->idle_workers); - if (in_list(workers, m)) { - status = APR_EAGAIN; - } - else { - H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m); - ++workers->mplx_count; - status = APR_SUCCESS; - } - - if (workers->idle_workers > 0) { - apr_thread_cond_signal(workers->mplx_added); + + status = apr_thread_mutex_create(&workers->lock, + APR_THREAD_MUTEX_DEFAULT, + workers->pool); + if (status == APR_SUCCESS) { + n = workers->nslots = workers->max_workers; + workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot)); + if (workers->slots == NULL) { + workers->nslots = 0; + status = APR_ENOMEM; } - else if (status == APR_SUCCESS - && workers->worker_count < workers->max_workers) { - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, - "h2_workers: got %d worker, adding 1", - workers->worker_count); - add_worker(workers); + for (i = 0; i < n; ++i) { + workers->slots[i].id = i; } - apr_thread_mutex_unlock(workers->lock); } - return status; -} - -apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m) -{ - apr_status_t status = apr_thread_mutex_lock(workers->lock); if (status == APR_SUCCESS) { - status = APR_EAGAIN; - if (in_list(workers, m)) { - H2_MPLX_REMOVE(m); - status = APR_SUCCESS; + /* we activate all for now, TODO: support min_workers again. + * do this in reverse for vanity reasons so slot 0 will most + * likely be at head of idle queue. */ + n = workers->max_workers; + for (i = n-1; i >= 0; --i) { + status = activate_slot(workers, &workers->slots[i]); + } + /* the rest of the slots go on the free list */ + for(i = n; i < workers->nslots; ++i) { + push_slot(&workers->free, &workers->slots[i]); } - apr_thread_mutex_unlock(workers->lock); + workers->dynamic = (workers->worker_count < workers->max_workers); } - return status; -} - -void h2_workers_set_max_idle_secs(h2_workers *workers, int idle_secs) -{ - if (idle_secs <= 0) { - ap_log_error(APLOG_MARK, APLOG_WARNING, 0, workers->s, - APLOGNO(02962) "h2_workers: max_worker_idle_sec value of %d" - " is not valid, ignored.", idle_secs); - return; + if (status == APR_SUCCESS) { + apr_pool_pre_cleanup_register(pool, workers, workers_pool_cleanup); + return workers; } - workers->max_idle_secs = idle_secs; + return NULL; } -apr_size_t h2_workers_tx_reserve(h2_workers *workers, apr_size_t count) +apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m) { - apr_status_t status = apr_thread_mutex_lock(workers->tx_lock); - if (status == APR_SUCCESS) { - count = H2MIN(workers->spare_tx_handles, count); - workers->spare_tx_handles -= count; - ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s, - "h2_workers: reserved %d tx handles, %d/%d left", - (int)count, (int)workers->spare_tx_handles, - (int)workers->max_tx_handles); - apr_thread_mutex_unlock(workers->tx_lock); - return count; - } - return 0; + apr_status_t status = h2_fifo_push(workers->mplxs, m); + wake_idle_worker(workers); + return status; } -void h2_workers_tx_free(h2_workers *workers, apr_size_t count) +apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m) { - apr_status_t status = apr_thread_mutex_lock(workers->tx_lock); - if (status == APR_SUCCESS) { - workers->spare_tx_handles += count; - ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s, - "h2_workers: freed %d tx handles, %d/%d left", - (int)count, (int)workers->spare_tx_handles, - (int)workers->max_tx_handles); - apr_thread_mutex_unlock(workers->tx_lock); - } + return h2_fifo_remove(workers->mplxs, m); } - Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_workers.h URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_workers.h?rev=1789739&r1=1789738&r2=1789739&view=diff ============================================================================== --- httpd/httpd/branches/2.4.x/modules/http2/h2_workers.h (original) +++ httpd/httpd/branches/2.4.x/modules/http2/h2_workers.h Fri Mar 31 19:41:01 2017 @@ -27,6 +27,9 @@ struct apr_thread_cond_t; struct h2_mplx; struct h2_request; struct h2_task; +struct h2_fifo; + +struct h2_slot; typedef struct h2_workers h2_workers; @@ -41,22 +44,20 @@ struct h2_workers { int idle_workers; int max_idle_secs; - apr_size_t max_tx_handles; - apr_size_t spare_tx_handles; - - unsigned int aborted : 1; + int aborted; + int dynamic; apr_threadattr_t *thread_attr; + int nslots; + struct h2_slot *slots; - APR_RING_HEAD(h2_worker_list, h2_worker) workers; - APR_RING_HEAD(h2_worker_zombies, h2_worker) zombies; - APR_RING_HEAD(h2_mplx_list, h2_mplx) mplxs; - int mplx_count; + struct h2_slot *free; + struct h2_slot *idle; + struct h2_slot *zombies; + + struct h2_fifo *mplxs; struct apr_thread_mutex_t *lock; - struct apr_thread_cond_t *mplx_added; - - struct apr_thread_mutex_t *tx_lock; }; @@ -64,12 +65,7 @@ struct h2_workers { * threads. */ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pool, - int min_size, int max_size, - apr_size_t max_tx_handles); - -/* Destroy the worker pool and all its threads. - */ -void h2_workers_destroy(h2_workers *workers); + int min_size, int max_size, int idle_secs); /** * Registers a h2_mplx for task scheduling. If this h2_mplx runs @@ -83,38 +79,4 @@ apr_status_t h2_workers_register(h2_work */ apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m); -/** - * Set the amount of seconds a h2_worker should wait for new tasks - * before shutting down (if there are more than the minimum number of - * workers). - */ -void h2_workers_set_max_idle_secs(h2_workers *workers, int idle_secs); - -/** - * Reservation of file handles available for transfer between workers - * and master connections. - * - * When handling output from request processing, file handles are often - * encountered when static files are served. The most efficient way is then - * to forward the handle itself to the master connection where it can be - * read or sendfile'd to the client. But file handles are a scarce resource, - * so there needs to be a limit on how many handles are transferred this way. - * - * h2_workers keeps track of the number of reserved handles and observes a - * configurable maximum value. - * - * @param workers the workers instance - * @param count how many handles the caller wishes to reserve - * @return the number of reserved handles, may be 0. - */ -apr_size_t h2_workers_tx_reserve(h2_workers *workers, apr_size_t count); - -/** - * Return a number of reserved file handles back to the pool. The number - * overall may not exceed the numbers reserved. - * @param workers the workers instance - * @param count how many handles are returned to the pool - */ -void h2_workers_tx_free(h2_workers *workers, apr_size_t count); - #endif /* defined(__mod_h2__h2_workers__) */