httpd-cvs mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ic...@apache.org
Subject svn commit: r1744712 - in /httpd/httpd/trunk: ./ modules/http2/
Date Fri, 20 May 2016 11:18:38 GMT
Author: icing
Date: Fri May 20 11:18:37 2016
New Revision: 1744712

URL: http://svn.apache.org/viewvc?rev=1744712&view=rev
Log:
mod_http2: improved resume/response/window update handling on master connection

Modified:
    httpd/httpd/trunk/CHANGES
    httpd/httpd/trunk/modules/http2/h2_bucket_beam.c
    httpd/httpd/trunk/modules/http2/h2_bucket_beam.h
    httpd/httpd/trunk/modules/http2/h2_mplx.c
    httpd/httpd/trunk/modules/http2/h2_mplx.h
    httpd/httpd/trunk/modules/http2/h2_session.c
    httpd/httpd/trunk/modules/http2/h2_session.h
    httpd/httpd/trunk/modules/http2/h2_stream.c
    httpd/httpd/trunk/modules/http2/h2_stream.h
    httpd/httpd/trunk/modules/http2/h2_task.c
    httpd/httpd/trunk/modules/http2/h2_task.h
    httpd/httpd/trunk/modules/http2/h2_util.c
    httpd/httpd/trunk/modules/http2/h2_util.h

Modified: httpd/httpd/trunk/CHANGES
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/CHANGES?rev=1744712&r1=1744711&r2=1744712&view=diff
==============================================================================
--- httpd/httpd/trunk/CHANGES [utf-8] (original)
+++ httpd/httpd/trunk/CHANGES [utf-8] Fri May 20 11:18:37 2016
@@ -1,6 +1,9 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache 2.5.0
 
+  *) mod_http2: improved event handling for suspended streams, responses
+     and window updates. [Stefan Eissing] 
+     
   *) mod_proxy, mod_ssl: Handle SSLProxy* directives in <Proxy> sections,
      allowing per backend TLS configuration.  [Yann Ylavic]
 

Modified: httpd/httpd/trunk/modules/http2/h2_bucket_beam.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_bucket_beam.c?rev=1744712&r1=1744711&r2=1744712&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_bucket_beam.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_bucket_beam.c Fri May 20 11:18:37 2016
@@ -356,12 +356,25 @@ static void h2_beam_emitted(h2_bucket_be
     }
 }
 
-static void report_consumption(h2_bucket_beam *beam)
+static void report_consumption(h2_bucket_beam *beam, int force)
 {
-    if (beam->consumed_fn && (beam->received_bytes != beam->reported_bytes)) {
-        beam->consumed_fn(beam->consumed_ctx, beam, 
-                          beam->received_bytes - beam->reported_bytes);
-        beam->reported_bytes = beam->received_bytes;
+    if (force || beam->received_bytes != beam->reported_consumed_bytes) {
+        if (beam->consumed_fn) { 
+            beam->consumed_fn(beam->consumed_ctx, beam, beam->received_bytes
+                              - beam->reported_consumed_bytes);
+        }
+        beam->reported_consumed_bytes = beam->received_bytes;
+    }
+}
+
+static void report_production(h2_bucket_beam *beam, int force)
+{
+    if (force || beam->sent_bytes != beam->reported_produced_bytes) {
+        if (beam->produced_fn) { 
+            beam->produced_fn(beam->produced_ctx, beam, beam->sent_bytes
+                              - beam->reported_produced_bytes);
+        }
+        beam->reported_produced_bytes = beam->sent_bytes;
     }
 }
 
@@ -393,7 +406,7 @@ static apr_status_t beam_cleanup(void *d
     beam_close(beam);
     r_purge_reds(beam);
     h2_blist_cleanup(&beam->red);
-    report_consumption(beam);
+    report_consumption(beam, 0);
     h2_blist_cleanup(&beam->purge);
     h2_blist_cleanup(&beam->hold);
     
@@ -500,7 +513,7 @@ void h2_beam_abort(h2_bucket_beam *beam)
         r_purge_reds(beam);
         h2_blist_cleanup(&beam->red);
         beam->aborted = 1;
-        report_consumption(beam);
+        report_consumption(beam, 0);
         if (beam->m_cond) {
             apr_thread_cond_broadcast(beam->m_cond);
         }
@@ -515,7 +528,7 @@ apr_status_t h2_beam_close(h2_bucket_bea
     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
         r_purge_reds(beam);
         beam_close(beam);
-        report_consumption(beam);
+        report_consumption(beam, 0);
         leave_yellow(beam, &bl);
     }
     return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
@@ -530,7 +543,7 @@ apr_status_t h2_beam_shutdown(h2_bucket_
         r_purge_reds(beam);
         h2_blist_cleanup(&beam->red);
         beam_close(beam);
-        report_consumption(beam);
+        report_consumption(beam, 0);
         
         while (status == APR_SUCCESS 
                && (!H2_BPROXY_LIST_EMPTY(&beam->proxies)
@@ -693,16 +706,18 @@ apr_status_t h2_beam_send(h2_bucket_beam
             status = APR_ECONNABORTED;
         }
         else if (red_brigade) {
+            int not_emtpy = APR_BRIGADE_EMPTY(red_brigade); 
             while (!APR_BRIGADE_EMPTY(red_brigade)
                    && status == APR_SUCCESS) {
                 bred = APR_BRIGADE_FIRST(red_brigade);
                 status = append_bucket(beam, bred, block, beam->red_pool, &bl);
             }
+            report_production(beam, not_emtpy);
             if (beam->m_cond) {
                 apr_thread_cond_broadcast(beam->m_cond);
             }
         }
-        report_consumption(beam);
+        report_consumption(beam, 0);
         leave_yellow(beam, &bl);
     }
     return status;
@@ -834,8 +849,17 @@ transfer:
                  }
             }
         }
-                
-        if (transferred) {
+
+        if ((!beam->green || APR_BRIGADE_EMPTY(beam->green))
+            && H2_BLIST_EMPTY(&beam->red) 
+            && beam->closed && !beam->close_sent) {
+            apr_bucket *b = apr_bucket_eos_create(bb->bucket_alloc);
+            APR_BRIGADE_INSERT_TAIL(bb, b);
+            beam->close_sent = 1;
+            ++transferred;
+            status = APR_SUCCESS;
+        }
+        else if (transferred) {
             status = APR_SUCCESS;
         }
         else if (beam->closed) {
@@ -866,7 +890,7 @@ leave:
 }
 
 void h2_beam_on_consumed(h2_bucket_beam *beam, 
-                         h2_beam_consumed_callback *cb, void *ctx)
+                         h2_beam_io_callback *cb, void *ctx)
 {
     h2_beam_lock bl;
     
@@ -876,6 +900,18 @@ void h2_beam_on_consumed(h2_bucket_beam
         leave_yellow(beam, &bl);
     }
 }
+
+void h2_beam_on_produced(h2_bucket_beam *beam, 
+                         h2_beam_io_callback *cb, void *ctx)
+{
+    h2_beam_lock bl;
+    
+    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
+        beam->produced_fn = cb;
+        beam->produced_ctx = ctx;
+        leave_yellow(beam, &bl);
+    }
+}
 
 void h2_beam_on_file_beam(h2_bucket_beam *beam, 
                           h2_beam_can_beam_callback *cb, void *ctx)

Modified: httpd/httpd/trunk/modules/http2/h2_bucket_beam.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_bucket_beam.h?rev=1744712&r1=1744711&r2=1744712&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_bucket_beam.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_bucket_beam.h Fri May 20 11:18:37 2016
@@ -152,8 +152,8 @@ typedef struct h2_bucket_beam h2_bucket_
 
 typedef apr_status_t h2_beam_mutex_enter(void *ctx, h2_beam_lock *pbl);
 
-typedef void h2_beam_consumed_callback(void *ctx, h2_bucket_beam *beam,
-                                       apr_off_t bytes);
+typedef void h2_beam_io_callback(void *ctx, h2_bucket_beam *beam,
+                                 apr_off_t bytes);
 
 typedef struct h2_beam_proxy h2_beam_proxy;
 typedef struct {
@@ -174,12 +174,14 @@ struct h2_bucket_beam {
     apr_pool_t *red_pool;
     
     apr_size_t max_buf_size;
-    apr_size_t files_beamed;  /* how many file handles have been set aside */
-    apr_file_t *last_beamed;  /* last file beamed */
+    apr_interval_time_t timeout;
+
     apr_off_t sent_bytes;     /* amount of bytes send */
     apr_off_t received_bytes; /* amount of bytes received */
-    apr_off_t reported_bytes; /* amount of bytes reported as consumed */
-    apr_size_t buckets_sent;
+
+    apr_size_t buckets_sent;  /* # of beam buckets sent */
+    apr_size_t files_beamed;  /* how many file handles have been set aside */
+    apr_file_t *last_beamed;  /* last file beamed */
     
     unsigned int aborted : 1;
     unsigned int closed : 1;
@@ -188,10 +190,13 @@ struct h2_bucket_beam {
     void *m_ctx;
     h2_beam_mutex_enter *m_enter;
     struct apr_thread_cond_t *m_cond;
-    apr_interval_time_t timeout;
     
-    h2_beam_consumed_callback *consumed_fn;
+    apr_off_t reported_consumed_bytes; /* amount of bytes reported as consumed */
+    h2_beam_io_callback *consumed_fn;
     void *consumed_ctx;
+    apr_off_t reported_produced_bytes; /* amount of bytes reported as produced */
+    h2_beam_io_callback *produced_fn;
+    void *produced_ctx;
     h2_beam_can_beam_callback *can_beam_fn;
     void *can_beam_ctx;
 };
@@ -319,7 +324,20 @@ apr_size_t h2_beam_buffer_size_get(h2_bu
  * Call from the red side, callbacks invoked on red side.
  */
 void h2_beam_on_consumed(h2_bucket_beam *beam, 
-                         h2_beam_consumed_callback *cb, void *ctx);
+                         h2_beam_io_callback *cb, void *ctx);
+
+/**
+ * Register a callback to be invoked on the red side with the
+ * amount of bytes that have been consumed by the red side, since the
+ * last callback invocation or reset.
+ * @param beam the beam to set the callback on
+ * @param cb   the callback or NULL
+ * @param ctx  the context to use in callback invocation
+ * 
+ * Call from the red side, callbacks invoked on red side.
+ */
+void h2_beam_on_produced(h2_bucket_beam *beam, 
+                         h2_beam_io_callback *cb, void *ctx);
 
 void h2_beam_on_file_beam(h2_bucket_beam *beam, 
                           h2_beam_can_beam_callback *cb, void *ctx);

Modified: httpd/httpd/trunk/modules/http2/h2_mplx.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.c?rev=1744712&r1=1744711&r2=1744712&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.c Fri May 20 11:18:37 2016
@@ -96,7 +96,8 @@ static apr_status_t enter_mutex(h2_mplx
         *pacquired = 0;
         return APR_SUCCESS;
     }
-        
+
+    AP_DEBUG_ASSERT(m->lock);
     status = apr_thread_mutex_lock(m->lock);
     *pacquired = (status == APR_SUCCESS);
     if (*pacquired) {
@@ -282,10 +283,11 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
         m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
 
         m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
-        m->sready = h2_ihash_create(m->pool, offsetof(h2_stream,id));
         m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
         m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
         m->q = h2_iq_create(m->pool, m->max_streams);
+        m->sready = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+        m->sresume = h2_ihash_create(m->pool, offsetof(h2_stream,id));
         m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
 
         m->stream_timeout = stream_timeout;
@@ -323,7 +325,7 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m
 
 static void input_consumed_signal(h2_mplx *m, h2_stream *stream)
 {
-    if (stream->input) {
+    if (stream->input && stream->started) {
         h2_beam_send(stream->input, NULL, 0); /* trigger updates */
     }
 }
@@ -331,7 +333,8 @@ static void input_consumed_signal(h2_mpl
 static int output_consumed_signal(h2_mplx *m, h2_task *task)
 {
     if (task->output.beam && task->worker_started && task->assigned) {
-        h2_beam_send(task->output.beam, NULL, 0); /* trigger updates */
+        /* trigger updates */
+        h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
     }
     return 0;
 }
@@ -428,6 +431,7 @@ static void stream_done(h2_mplx *m, h2_s
      */
     h2_iq_remove(m->q, stream->id);
     h2_ihash_remove(m->sready, stream->id);
+    h2_ihash_remove(m->sresume, stream->id);
     h2_ihash_remove(m->streams, stream->id);
     if (stream->input) {
         m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
@@ -630,74 +634,6 @@ void h2_mplx_set_consumed_cb(h2_mplx *m,
     m->input_consumed_ctx = ctx;
 }
 
-static int update_window(void *ctx, void *val)
-{
-    input_consumed_signal(ctx, val);
-    return 1;
-}
-
-apr_status_t h2_mplx_in_update_windows(h2_mplx *m)
-{
-    apr_status_t status;
-    int acquired;
-    
-    AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_ihash_iter(m->streams, update_window, m);
-        
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
-                      "h2_session(%ld): windows updated", m->id);
-        status = APR_SUCCESS;
-        leave_mutex(m, acquired);
-    }
-    return status;
-}
-
-static int stream_iter_first(void *ctx, void *val)
-{
-    h2_stream **pstream = ctx;
-    *pstream = val;
-    return 0;
-}
-
-h2_stream *h2_mplx_next_submit(h2_mplx *m)
-{
-    apr_status_t status;
-    h2_stream *stream = NULL;
-    int acquired;
-
-    AP_DEBUG_ASSERT(m);
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_ihash_iter(m->sready, stream_iter_first, &stream);
-        if (stream) {
-            h2_task *task = h2_ihash_get(m->tasks, stream->id);
-            h2_ihash_remove(m->sready, stream->id);
-            if (task) {
-                task->submitted = 1;
-                if (task->rst_error) {
-                    h2_stream_rst(stream, task->rst_error);
-                }
-                else {
-                    AP_DEBUG_ASSERT(task->response);
-                    h2_stream_set_response(stream, task->response, 
-                                           task->output.beam);
-                }
-            }
-            else {
-                /* We have the stream ready without a task. This happens
-                 * when we fail streams early. A response should already
-                 * be present.  */
-                AP_DEBUG_ASSERT(stream->response || stream->rst_error);
-            }
-        }
-        leave_mutex(m, acquired);
-    }
-    return stream;
-}
-
 static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
 {
     apr_status_t status = APR_SUCCESS;
@@ -798,6 +734,9 @@ apr_status_t h2_mplx_out_trywait(h2_mplx
         if (m->aborted) {
             status = APR_ECONNABORTED;
         }
+        else if (!h2_ihash_empty(m->sready) || !h2_ihash_empty(m->sresume)) {
+            status = APR_SUCCESS;
+        }
         else {
             m->added_output = iowait;
             status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
@@ -916,6 +855,7 @@ static h2_task *pop_task(h2_mplx *m)
             if (new_conn) {
                 h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave));
             }
+            stream->started = 1;
             task->worker_started = 1;
             task->started_at = apr_time_now();
             if (sid > m->max_stream_started) {
@@ -1018,6 +958,7 @@ static void task_done(h2_mplx *m, h2_tas
         task->done_at = apr_time_now();
         if (task->output.beam) {
             h2_beam_on_consumed(task->output.beam, NULL, NULL);
+            h2_beam_on_produced(task->output.beam, NULL, NULL);
             h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
         }
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
@@ -1045,6 +986,11 @@ static void task_done(h2_mplx *m, h2_tas
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
                           "h2_mplx(%s): task_done, stream still open", 
                           task->id);
+            if (h2_stream_is_suspended(stream)) {
+                /* more data will not arrive, resume the stream */
+                h2_ihash_add(m->sresume, stream);
+                have_out_data_for(m, stream->id);
+            }
         }
         else {
             /* stream done, was it placed in hold? */
@@ -1353,4 +1299,146 @@ void h2_mplx_req_engine_done(h2_req_engi
         }
     }
 }
-                                
+
+/*******************************************************************************
+ * mplx master events dispatching
+ ******************************************************************************/
+
+typedef struct {
+    h2_mplx *m;
+    stream_ev_callback *on_resume; 
+    stream_ev_callback *on_response; 
+    void *on_ctx;
+    apr_status_t status;
+} dispatch_ctx;
+
+static int update_window(void *ctx, void *val)
+{
+    input_consumed_signal(ctx, val);
+    return 1;
+}
+
+static int stream_ready_iter(void *data, void *val)
+{
+    dispatch_ctx *ctx = data;
+    h2_stream *stream = val;
+    h2_task *task = h2_ihash_get(ctx->m->tasks, stream->id);
+    
+    if (task) {
+        task->submitted = 1;
+        if (task->rst_error) {
+            h2_stream_rst(stream, task->rst_error);
+        }
+        else {
+            AP_DEBUG_ASSERT(task->response);
+            h2_stream_set_response(stream, task->response, task->output.beam);
+        }
+    }
+    else {
+        /* We have the stream ready without a task. This happens
+         * when we fail streams early. A response should already
+         * be present.  */
+        AP_DEBUG_ASSERT(stream->response || stream->rst_error);
+    }
+    
+    ctx->status = ctx->on_response(ctx->on_ctx, stream->id);
+    return 1;
+}
+
+static int stream_resume_iter(void *data, void *val)
+{
+    dispatch_ctx *ctx = data;
+    h2_stream *stream = val;
+
+    h2_stream_set_suspended(stream, 0);
+    ctx->status = ctx->on_resume(ctx->on_ctx, stream->id);
+    return 1;
+}
+
+apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, 
+                                            stream_ev_callback *on_resume, 
+                                            stream_ev_callback *on_response, 
+                                            void *on_ctx)
+{
+    apr_status_t status;
+    int acquired;
+    
+    AP_DEBUG_ASSERT(m);
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+        dispatch_ctx ctx;
+        ctx.m = m;
+        ctx.on_resume = on_resume;
+        ctx.on_response = on_response;
+        ctx.on_ctx = on_ctx;
+        ctx.status = APR_SUCCESS;
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
+                      "h2_mplx(%ld): dispatch events", m->id);
+        /* update input windows for streams */
+        h2_ihash_iter(m->streams, update_window, m);
+
+        if (ctx.on_response) {
+            h2_ihash_iter(m->sready, stream_ready_iter, &ctx);
+            h2_ihash_clear(m->sready);
+        }
+
+        if (ctx.on_resume) {
+            h2_ihash_iter(m->sresume, stream_resume_iter, &ctx);
+            h2_ihash_clear(m->sresume);
+        }
+        
+        leave_mutex(m, acquired);
+        return ctx.status;
+    }
+    return status;
+}
+
+static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
+{
+    h2_mplx *m = ctx;
+    apr_status_t status;
+    h2_stream *stream;
+    h2_task *task;
+    int acquired;
+    
+    AP_DEBUG_ASSERT(m);
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+        stream = h2_ihash_get(m->streams, beam->id);
+        if (stream && h2_stream_is_suspended(stream)) {
+            h2_ihash_add(m->sresume, stream);
+            task = h2_ihash_get(m->tasks, stream->id);
+            if (task && task->output.beam) {
+                h2_beam_on_produced(task->output.beam, NULL, NULL);
+            }
+            have_out_data_for(m, beam->id);
+        }
+        leave_mutex(m, acquired);
+    }
+}
+
+apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id)
+{
+    apr_status_t status;
+    h2_stream *stream;
+    h2_task *task;
+    int acquired;
+    
+    AP_DEBUG_ASSERT(m);
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+        stream = h2_ihash_get(m->streams, stream_id);
+        if (stream && !h2_stream_is_suspended(stream)) {
+            h2_stream_set_suspended(stream, 1);
+            task = h2_ihash_get(m->tasks, stream->id);
+            if (task && task->output.beam && h2_beam_empty(task->output.beam)) {
+                /* register callback so that we can resume on new output */
+                h2_beam_on_produced(task->output.beam, output_produced, m);
+            }
+            else {
+                /* if the beam got data in the meantime, add this to the to-be
+                 * resumed streams right away. */
+                h2_ihash_add(m->sresume, stream);
+            }
+        }
+        leave_mutex(m, acquired);
+    }
+    return status;
+}

Modified: httpd/httpd/trunk/modules/http2/h2_mplx.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.h?rev=1744712&r1=1744711&r2=1744712&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.h Fri May 20 11:18:37 2016
@@ -73,10 +73,12 @@ struct h2_mplx {
     unsigned int need_registration : 1;
 
     struct h2_ihash_t *streams;     /* all streams currently processing */
-    struct h2_ihash_t *sready;      /* all streams ready for response */
     struct h2_ihash_t *shold;       /* all streams done with task ongoing */
     struct h2_ihash_t *spurge;      /* all streams done, ready for destroy */
+
     struct h2_iqueue *q;            /* all stream ids that need to be started */
+    struct h2_ihash_t *sready;      /* all streams ready for response */
+    struct h2_ihash_t *sresume;     /* all streams that can be resumed */
     
     struct h2_ihash_t *tasks;       /* all tasks started and not destroyed */
     struct h2_ihash_t *redo_tasks;  /* all tasks that need to be redone */
@@ -214,32 +216,26 @@ apr_status_t h2_mplx_reprioritize(h2_mpl
  */
 void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx);
 
-/*******************************************************************************
- * Input handling of streams.
- ******************************************************************************/
+
+typedef apr_status_t stream_ev_callback(void *ctx, int stream_id);
 
 /**
- * Invoke the consumed callback for all streams that had bytes read since the 
- * last call to this function. If no stream had input data consumed, the 
- * callback is not invoked.
- * The consumed callback may also be invoked at other times whenever
- * the need arises.
+ * Dispatch events for the master connection, such as
+ * - resume: new output data has arrived for a suspended stream
+ * - response: the response for a stream is ready
  */
-apr_status_t h2_mplx_in_update_windows(h2_mplx *m);
+apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, 
+                                            stream_ev_callback *on_resume, 
+                                            stream_ev_callback *on_response, 
+                                            void *ctx);
+
+apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id);
 
 /*******************************************************************************
  * Output handling of streams.
  ******************************************************************************/
 
 /**
- * Get a stream whose response is ready for submit. Will set response and
- * any out data available in stream. 
- * @param m the mplxer to get a response from
- * @param bb the brigade to place any existing repsonse body data into
- */
-struct h2_stream *h2_mplx_next_submit(h2_mplx *m);
-
-/**
  * Opens the output for the given stream with the specified response.
  */
 apr_status_t h2_mplx_out_open(h2_mplx *mplx, int stream_id,

Modified: httpd/httpd/trunk/modules/http2/h2_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_session.c?rev=1744712&r1=1744711&r2=1744712&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_session.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_session.c Fri May 20 11:18:37 2016
@@ -79,6 +79,18 @@ static int is_accepting_streams(h2_sessi
 static void dispatch_event(h2_session *session, h2_session_event_t ev, 
                              int err, const char *msg);
 
+apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
+{
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+                  "h2_stream(%ld-%d): EOS bucket cleanup -> done", 
+                  session->id, stream->id);
+    h2_ihash_remove(session->streams, stream->id);
+    h2_mplx_stream_done(session->mplx, stream);
+    
+    dispatch_event(session, H2_SESSION_EV_STREAM_DONE, 0, NULL);
+    return APR_SUCCESS;
+}
+
 typedef struct stream_sel_ctx {
     h2_session *session;
     h2_stream *candidate;
@@ -133,7 +145,6 @@ h2_stream *h2_session_open_stream(h2_ses
     
     stream = h2_stream_open(stream_id, stream_pool, session, 
                             initiated_on, req);
-    ++session->unanswered_streams;
     nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
     h2_ihash_add(session->streams, stream);
     
@@ -1064,58 +1075,6 @@ static apr_status_t h2_session_start(h2_
     return status;
 }
 
-typedef struct {
-    h2_session *session;
-    int resume_count;
-} resume_ctx;
-
-static int resume_on_data(void *ctx, void *val)
-{
-    h2_stream *stream = val;
-    resume_ctx *rctx = (resume_ctx*)ctx;
-    h2_session *session = rctx->session;
-    AP_DEBUG_ASSERT(session);
-    AP_DEBUG_ASSERT(stream);
-    
-    if (h2_stream_is_suspended(stream)) {
-        apr_status_t status;
-        apr_off_t len = -1;
-        int eos;
-        
-        status = h2_stream_out_prepare(stream, &len, &eos);
-        if (status == APR_SUCCESS) {
-            int rv;
-            h2_stream_set_suspended(stream, 0);
-            ++rctx->resume_count;
-            
-            rv = nghttp2_session_resume_data(session->ngh2, stream->id);
-            ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)?
-                          APLOG_ERR : APLOG_DEBUG, 0, session->c,
-                          APLOGNO(02936) 
-                          "h2_stream(%ld-%d): resuming %s, len=%ld, eos=%d",
-                          session->id, stream->id, 
-                          rv? nghttp2_strerror(rv) : "", (long)len, eos);
-        }
-    }
-    return 1;
-}
-
-static int h2_session_resume_streams_with_data(h2_session *session)
-{
-    AP_DEBUG_ASSERT(session);
-    if (session->open_streams && !session->mplx->aborted) {
-        resume_ctx ctx;
-        ctx.session      = session;
-        ctx.resume_count = 0;
-
-        /* Resume all streams where we have data in the out queue and
-         * which had been suspended before. */
-        h2_ihash_iter(session->streams, resume_on_data, &ctx);
-        return ctx.resume_count;
-    }
-    return 0;
-}
-
 static ssize_t stream_data_cb(nghttp2_session *ng2s,
                               int32_t stream_id,
                               uint8_t *buf,
@@ -1171,7 +1130,7 @@ static ssize_t stream_data_cb(nghttp2_se
              * it. Remember at our h2_stream that we need to do this.
              */
             nread = 0;
-            h2_stream_set_suspended(stream, 1);
+            h2_mplx_suspend_stream(session->mplx, stream->id);
             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03071)
                           "h2_stream(%ld-%d): suspending",
                           session->id, (int)stream_id);
@@ -1214,104 +1173,6 @@ typedef struct {
     size_t offset;
 } nvctx_t;
 
-/**
- * Start submitting the response to a stream request. This is possible
- * once we have all the response headers. The response body will be
- * read by the session using the callback we supply.
- */
-static apr_status_t submit_response(h2_session *session, h2_stream *stream)
-{
-    apr_status_t status = APR_SUCCESS;
-    h2_response *response = h2_stream_get_response(stream);
-    int rv = 0;
-    AP_DEBUG_ASSERT(session);
-    AP_DEBUG_ASSERT(stream);
-    AP_DEBUG_ASSERT(response || stream->rst_error);
-    
-    if (stream->submitted) {
-        rv = NGHTTP2_PROTOCOL_ERROR;
-    }
-    else if (response && response->headers) {
-        nghttp2_data_provider provider, *pprovider = NULL;
-        h2_ngheader *ngh;
-        const h2_priority *prio;
-        
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073)
-                      "h2_stream(%ld-%d): submit response %d, REMOTE_WINDOW_SIZE=%u",
-                      session->id, stream->id, response->http_status,
-                      (unsigned int)nghttp2_session_get_stream_remote_window_size(session->ngh2, stream->id));
-        
-        if (response->content_length != 0) {
-            memset(&provider, 0, sizeof(provider));
-            provider.source.fd = stream->id;
-            provider.read_callback = stream_data_cb;
-            pprovider = &provider;
-        }
-        
-        /* If this stream is not a pushed one itself,
-         * and HTTP/2 server push is enabled here,
-         * and the response is in the range 200-299 *),
-         * and the remote side has pushing enabled,
-         * -> find and perform any pushes on this stream
-         *    *before* we submit the stream response itself.
-         *    This helps clients avoid opening new streams on Link
-         *    headers that get pushed right afterwards.
-         * 
-         * *) the response code is relevant, as we do not want to 
-         *    make pushes on 401 or 403 codes, neiterh on 301/302
-         *    and friends. And if we see a 304, we do not push either
-         *    as the client, having this resource in its cache, might
-         *    also have the pushed ones as well.
-         */
-        if (stream->request && !stream->request->initiated_on
-            && H2_HTTP_2XX(response->http_status)
-            && h2_session_push_enabled(session)) {
-            
-            h2_stream_submit_pushes(stream);
-        }
-        
-        prio = h2_stream_get_priority(stream);
-        if (prio) {
-            h2_session_set_prio(session, stream, prio);
-            /* no showstopper if that fails for some reason */
-        }
-        
-        ngh = h2_util_ngheader_make_res(stream->pool, response->http_status, 
-                                        response->headers);
-        rv = nghttp2_submit_response(session->ngh2, response->stream_id,
-                                     ngh->nv, ngh->nvlen, pprovider);
-    }
-    else {
-        int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
-        
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03074)
-                      "h2_stream(%ld-%d): RST_STREAM, err=%d",
-                      session->id, stream->id, err);
-
-        rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
-                                       stream->id, err);
-    }
-    
-    stream->submitted = 1;
-    --session->unanswered_streams;
-    if (stream->request && stream->request->initiated_on) {
-        ++session->pushes_submitted;
-    }
-    else {
-        ++session->responses_submitted;
-    }
-    
-    if (nghttp2_is_fatal(rv)) {
-        status = APR_EGENERAL;
-        dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv));
-        ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
-                      APLOGNO(02940) "submit_response: %s", 
-                      nghttp2_strerror(rv));
-    }
-    
-    return status;
-}
-
 struct h2_stream *h2_session_push(h2_session *session, h2_stream *is,
                                   h2_push *push)
 {
@@ -1465,19 +1326,6 @@ apr_status_t h2_session_set_prio(h2_sess
     return status;
 }
 
-apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
-{
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
-                  "h2_stream(%ld-%d): EOS bucket cleanup -> done", 
-                  session->id, stream->id);
-    h2_ihash_remove(session->streams, stream->id);
-    --session->unanswered_streams;
-    h2_mplx_stream_done(session->mplx, stream);
-    
-    dispatch_event(session, H2_SESSION_EV_STREAM_DONE, 0, NULL);
-    return APR_SUCCESS;
-}
-
 int h2_session_push_enabled(h2_session *session)
 {
     /* iff we can and they can and want */
@@ -1504,6 +1352,7 @@ static apr_status_t h2_session_send(h2_s
     if (socket) {
         apr_socket_timeout_set(socket, saved_timeout);
     }
+    session->have_written = 1;
     if (rv != 0) {
         if (nghttp2_is_fatal(rv)) {
             dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv));
@@ -1517,6 +1366,148 @@ static apr_status_t h2_session_send(h2_s
     return APR_SUCCESS;
 }
 
+/**
+ * A stream was resumed as new output data arrived.
+ */
+static apr_status_t on_stream_resume(void *ctx, int stream_id)
+{
+    h2_session *session = ctx;
+    h2_stream *stream = get_stream(session, stream_id);
+    apr_status_t status = APR_SUCCESS;
+    
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
+                  "h2_stream(%ld-%d): on_resume", session->id, stream_id);
+    if (stream) {
+        int rv = nghttp2_session_resume_data(session->ngh2, stream_id);
+        session->have_written = 1;
+        ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)?
+                      APLOG_ERR : APLOG_DEBUG, 0, session->c,
+                      APLOGNO(02936) 
+                      "h2_stream(%ld-%d): resuming %s",
+                      session->id, stream->id, rv? nghttp2_strerror(rv) : "");
+    }
+    return status;
+}
+
+/**
+ * A response for the stream is ready.
+ */
+static apr_status_t on_stream_response(void *ctx, int stream_id)
+{
+    h2_session *session = ctx;
+    h2_stream *stream = get_stream(session, stream_id);
+    apr_status_t status = APR_SUCCESS;
+    h2_response *response;
+    int rv = 0;
+
+    AP_DEBUG_ASSERT(session);
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
+                  "h2_stream(%ld-%d): on_response", session->id, stream_id);
+    if (!stream) {
+        return APR_NOTFOUND;
+    }
+    
+    response = h2_stream_get_response(stream);
+    AP_DEBUG_ASSERT(response || stream->rst_error);
+    
+    if (stream->submitted) {
+        rv = NGHTTP2_PROTOCOL_ERROR;
+    }
+    else if (response && response->headers) {
+        nghttp2_data_provider provider, *pprovider = NULL;
+        h2_ngheader *ngh;
+        const h2_priority *prio;
+        
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073)
+                      "h2_stream(%ld-%d): submit response %d, REMOTE_WINDOW_SIZE=%u",
+                      session->id, stream->id, response->http_status,
+                      (unsigned int)nghttp2_session_get_stream_remote_window_size(session->ngh2, stream->id));
+        
+        if (response->content_length != 0) {
+            memset(&provider, 0, sizeof(provider));
+            provider.source.fd = stream->id;
+            provider.read_callback = stream_data_cb;
+            pprovider = &provider;
+        }
+        
+        /* If this stream is not a pushed one itself,
+         * and HTTP/2 server push is enabled here,
+         * and the response is in the range 200-299 *),
+         * and the remote side has pushing enabled,
+         * -> find and perform any pushes on this stream
+         *    *before* we submit the stream response itself.
+         *    This helps clients avoid opening new streams on Link
+         *    headers that get pushed right afterwards.
+         * 
+         * *) the response code is relevant, as we do not want to 
+         *    make pushes on 401 or 403 codes, neiterh on 301/302
+         *    and friends. And if we see a 304, we do not push either
+         *    as the client, having this resource in its cache, might
+         *    also have the pushed ones as well.
+         */
+        if (stream->request && !stream->request->initiated_on
+            && H2_HTTP_2XX(response->http_status)
+            && h2_session_push_enabled(session)) {
+            
+            h2_stream_submit_pushes(stream);
+        }
+        
+        prio = h2_stream_get_priority(stream);
+        if (prio) {
+            h2_session_set_prio(session, stream, prio);
+            /* no showstopper if that fails for some reason */
+        }
+        
+        ngh = h2_util_ngheader_make_res(stream->pool, response->http_status, 
+                                        response->headers);
+        rv = nghttp2_submit_response(session->ngh2, response->stream_id,
+                                     ngh->nv, ngh->nvlen, pprovider);
+    }
+    else {
+        int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
+        
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03074)
+                      "h2_stream(%ld-%d): RST_STREAM, err=%d",
+                      session->id, stream->id, err);
+
+        rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
+                                       stream->id, err);
+    }
+    
+    stream->submitted = 1;
+    session->have_written = 1;
+    
+    if (stream->request && stream->request->initiated_on) {
+        ++session->pushes_submitted;
+    }
+    else {
+        ++session->responses_submitted;
+    }
+    
+    if (nghttp2_is_fatal(rv)) {
+        status = APR_EGENERAL;
+        dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv));
+        ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
+                      APLOGNO(02940) "submit_response: %s", 
+                      nghttp2_strerror(rv));
+    }
+    
+    ++session->unsent_submits;
+    
+    /* Unsent push promises are written immediately, as nghttp2
+     * 1.5.0 realizes internal stream data structures only on 
+     * send and we might need them for other submits. 
+     * Also, to conserve memory, we send at least every 10 submits
+     * so that nghttp2 does not buffer all outbound items too 
+     * long.
+     */
+    if (status == APR_SUCCESS 
+        && (session->unsent_promises || session->unsent_submits > 10)) {
+        status = h2_session_send(session);
+    }
+    return status;
+}
+
 static apr_status_t h2_session_receive(void *ctx, const char *data, 
                                        apr_size_t len, apr_size_t *readlen)
 {
@@ -1644,36 +1635,6 @@ static int has_suspended_streams(h2_sess
     return has_suspended;
 }
 
-static apr_status_t h2_session_submit(h2_session *session)
-{
-    apr_status_t status = APR_EAGAIN;
-    h2_stream *stream;
-    
-    if (has_unsubmitted_streams(session)) {
-        /* If we have responses ready, submit them now. */
-        while ((stream = h2_mplx_next_submit(session->mplx))) {
-            status = submit_response(session, stream);
-            ++session->unsent_submits;
-            
-            /* Unsent push promises are written immediately, as nghttp2
-             * 1.5.0 realizes internal stream data structures only on 
-             * send and we might need them for other submits. 
-             * Also, to conserve memory, we send at least every 10 submits
-             * so that nghttp2 does not buffer all outbound items too 
-             * long.
-             */
-            if (status == APR_SUCCESS 
-                && (session->unsent_promises || session->unsent_submits > 10)) {
-                status = h2_session_send(session);
-                if (status != APR_SUCCESS) {
-                    break;
-                }
-            }
-        }
-    }
-    return status;
-}
-
 static const char *StateNames[] = {
     "INIT",      /* H2_SESSION_ST_INIT */
     "DONE",      /* H2_SESSION_ST_DONE */
@@ -1855,51 +1816,52 @@ static void h2_session_ev_no_io(h2_sessi
         case H2_SESSION_ST_BUSY:
         case H2_SESSION_ST_LOCAL_SHUTDOWN:
         case H2_SESSION_ST_REMOTE_SHUTDOWN:
-            /* nothing for input and output to do. If we remain
-             * in this state, we go into a tight loop and suck up
-             * CPU cycles. Ideally, we'd like to do a blocking read, but that
-             * is not possible if we have scheduled tasks and wait
-             * for them to produce something. */
+            /* Nothing to READ, nothing to WRITE on the master connection.
+             * Possible causes:
+             * - we wait for the client to send us sth
+             * - we wait for started tasks to produce output
+             * - we have finished all streams and the client has sent GO_AWAY
+             */
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
                           "h2_session(%ld): NO_IO event, %d streams open", 
                           session->id, session->open_streams);
-            if (!session->open_streams) {
-                if (!is_accepting_streams(session)) {
-                    /* We are no longer accepting new streams and have
-                     * finished processing existing ones. Time to leave. */
-                    h2_session_shutdown(session, arg, msg, 0);
-                    transit(session, "no io", H2_SESSION_ST_DONE);
+            if (session->open_streams > 0) {
+                if (has_unsubmitted_streams(session) 
+                    || has_suspended_streams(session)) {
+                    /* waiting for at least one stream to produce data */
+                    transit(session, "no io", H2_SESSION_ST_WAIT);
                 }
                 else {
-                    apr_time_t now = apr_time_now();
-                    /* When we have no streams, no task event are possible,
-                     * switch to blocking reads */
-                    transit(session, "no io (keepalive)", H2_SESSION_ST_IDLE);
-                    session->idle_until = (session->remote.emitted_count? 
-                                           session->s->keep_alive_timeout : 
-                                           session->s->timeout) + now;
-                    session->keep_sync_until = now + apr_time_from_sec(1);
+                    /* we have streams open, and all are submitted and none
+                     * is suspended. The only thing keeping us from WRITEing
+                     * more must be the flow control.
+                     * This means we only wait for WINDOW_UPDATE from the 
+                     * client and can block on READ. */
+                    transit(session, "no io (flow wait)", H2_SESSION_ST_IDLE);
+                    session->idle_until = apr_time_now() + session->s->timeout;
+                    session->keep_sync_until = session->idle_until;
+                    /* Make sure we have flushed all previously written output
+                     * so that the client will react. */
+                    if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
+                        dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+                        return;
+                    }
                 }
             }
-            else if (!has_unsubmitted_streams(session)
-                     && !has_suspended_streams(session)) {
-                transit(session, "no io (flow wait)", H2_SESSION_ST_IDLE);
-                session->idle_until = apr_time_now() + session->s->timeout;
-                session->keep_sync_until = session->idle_until;
-                /* none of our streams is waiting for a response or
-                 * new output data from task processing, 
-                 * switch to blocking reads. We are probably waiting on
-                 * window updates. */
-                if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
-                    dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
-                    return;
-                }
+            else if (is_accepting_streams(session)) {
+                /* When we have no streams, but accept new, switch to idle */
+                apr_time_t now = apr_time_now();
+                transit(session, "no io (keepalive)", H2_SESSION_ST_IDLE);
+                session->idle_until = (session->remote.emitted_count? 
+                                       session->s->keep_alive_timeout : 
+                                       session->s->timeout) + now;
+                session->keep_sync_until = now + apr_time_from_sec(1);
             }
             else {
-                /* Unable to do blocking reads, as we wait on events from
-                 * task processing in other threads. Do a busy wait with
-                 * backoff timer. */
-                transit(session, "no io", H2_SESSION_ST_WAIT);
+                /* We are no longer accepting new streams and there are
+                 * none left. Time to leave. */
+                h2_session_shutdown(session, arg, msg, 0);
+                transit(session, "no io", H2_SESSION_ST_DONE);
             }
             break;
         default:
@@ -1989,8 +1951,6 @@ static void h2_session_ev_stream_open(h2
 static void h2_session_ev_stream_done(h2_session *session, int arg, const char *msg)
 {
     --session->open_streams;
-    if (session->open_streams <= 0) {
-    }
     switch (session->state) {
         case H2_SESSION_ST_IDLE:
             if (session->open_streams == 0) {
@@ -2068,17 +2028,21 @@ apr_status_t h2_session_process(h2_sessi
 {
     apr_status_t status = APR_SUCCESS;
     conn_rec *c = session->c;
-    int rv, have_written, have_read, mpm_state;
+    int rv, mpm_state, trace = APLOGctrace3(c);
 
-    ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
-                  "h2_session(%ld): process start, async=%d", session->id, async);
+    if (trace) {
+        ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+                      "h2_session(%ld): process start, async=%d", 
+                      session->id, async);
+    }
                   
     if (c->cs) {
         c->cs->state = CONN_STATE_WRITE_COMPLETION;
     }
     
     while (1) {
-        have_read = have_written = 0;
+        trace = APLOGctrace3(c);
+        session->have_read = session->have_written = 0;
 
         if (!ap_mpm_query(AP_MPMQ_MPM_STATE, &mpm_state)) {
             if (mpm_state == AP_MPMQ_STOPPING) {
@@ -2114,10 +2078,12 @@ apr_status_t h2_session_process(h2_sessi
                 /* make certain, we send everything before we idle */
                 if (!session->keep_sync_until && async && !session->open_streams
                     && !session->r && session->remote.emitted_count) {
-                    ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
-                                  "h2_session(%ld): async idle, nonblock read, "
-                                  "%d streams open", session->id, 
-                                  session->open_streams);
+                    if (trace) {
+                        ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+                                      "h2_session(%ld): async idle, nonblock read, "
+                                      "%d streams open", session->id, 
+                                      session->open_streams);
+                    }
                     /* We do not return to the async mpm immediately, since under
                      * load, mpms show the tendency to throw keep_alive connections
                      * away very rapidly.
@@ -2130,7 +2096,7 @@ apr_status_t h2_session_process(h2_sessi
                     status = h2_session_read(session, 0);
                     
                     if (status == APR_SUCCESS) {
-                        have_read = 1;
+                        session->have_read = 1;
                         dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
                     }
                     else if (APR_STATUS_IS_EAGAIN(status) || APR_STATUS_IS_TIMEUP(status)) {
@@ -2150,10 +2116,12 @@ apr_status_t h2_session_process(h2_sessi
                     }
                 }
                 else {
-                    ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
-                                  "h2_session(%ld): sync idle, stutter 1-sec, "
-                                  "%d streams open", session->id,
-                                  session->open_streams);
+                    if (trace) {
+                        ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+                                      "h2_session(%ld): sync idle, stutter 1-sec, "
+                                      "%d streams open", session->id,
+                                      session->open_streams);
+                    }
                     /* We wait in smaller increments, using a 1 second timeout.
                      * That gives us the chance to check for MPMQ_STOPPING often. 
                      */
@@ -2165,7 +2133,7 @@ apr_status_t h2_session_process(h2_sessi
                     h2_filter_cin_timeout_set(session->cin, apr_time_from_sec(1));
                     status = h2_session_read(session, 1);
                     if (status == APR_SUCCESS) {
-                        have_read = 1;
+                        session->have_read = 1;
                         dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
                     }
                     else if (status == APR_EAGAIN) {
@@ -2179,12 +2147,14 @@ apr_status_t h2_session_process(h2_sessi
                             session->keep_sync_until = 0;
                         }
                         if (now > session->idle_until) {
-                            ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
-                                          "h2_session(%ld): keepalive timeout",
-                                          session->id);
+                            if (trace) {
+                                ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+                                              "h2_session(%ld): keepalive timeout",
+                                              session->id);
+                            }
                             dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, "timeout");
                         }
-                        else {
+                        else if (trace) {                        
                             ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
                                           "h2_session(%ld): keepalive, %f sec left",
                                           session->id, (session->idle_until - now) / 1000000.0f);
@@ -2192,9 +2162,11 @@ apr_status_t h2_session_process(h2_sessi
                         /* continue reading handling */
                     }
                     else {
-                        ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
-                                      "h2_session(%ld): idle(1 sec timeout) "
-                                      "read failed", session->id);
+                        if (trace) {
+                            ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+                                          "h2_session(%ld): idle(1 sec timeout) "
+                                          "read failed", session->id);
+                        }
                         dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, "error");
                     }
                 }
@@ -2209,7 +2181,7 @@ apr_status_t h2_session_process(h2_sessi
                     h2_filter_cin_timeout_set(session->cin, session->s->timeout);
                     status = h2_session_read(session, 0);
                     if (status == APR_SUCCESS) {
-                        have_read = 1;
+                        session->have_read = 1;
                         dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
                     }
                     else if (status == APR_EAGAIN) {
@@ -2224,43 +2196,32 @@ apr_status_t h2_session_process(h2_sessi
                     }
                 }
                 
-                if (session->open_streams) {
-                    /* resume any streams with output data */
-                    h2_session_resume_streams_with_data(session);
-                    /* Submit any responses/push_promises that are ready */
-                    status = h2_session_submit(session);
-                    if (status == APR_SUCCESS) {
-                        have_written = 1;
-                    }
-                    else if (status != APR_EAGAIN) {
-                        dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 
-                                         H2_ERR_INTERNAL_ERROR, "submit error");
-                        break;
-                    }
-                    /* send out window updates for our inputs */
-                    status = h2_mplx_in_update_windows(session->mplx);
-                    if (status != APR_SUCCESS) {
-                        dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 
-                                       H2_ERR_INTERNAL_ERROR, 
-                                       "window update error");
-                        break;
-                    }
+                /* trigger window updates, stream resumes and submits */
+                status = h2_mplx_dispatch_master_events(session->mplx, 
+                                                        on_stream_resume,
+                                                        on_stream_response, 
+                                                        session);
+                if (status != APR_SUCCESS) {
+                    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, c,
+                                  "h2_session(%ld): dispatch error", 
+                                  session->id);
+                    dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 
+                                   H2_ERR_INTERNAL_ERROR, 
+                                   "dispatch error");
+                    break;
                 }
                 
                 if (nghttp2_session_want_write(session->ngh2)) {
                     ap_update_child_status(session->c->sbh, SERVER_BUSY_WRITE, NULL);
                     status = h2_session_send(session);
-                    if (status == APR_SUCCESS) {
-                        have_written = 1;
-                    }
-                    else {
+                    if (status != APR_SUCCESS) {
                         dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 
-                                         H2_ERR_INTERNAL_ERROR, "writing");
+                                       H2_ERR_INTERNAL_ERROR, "writing");
                         break;
                     }
                 }
                 
-                if (have_read || have_written) {
+                if (session->have_read || session->have_written) {
                     if (session->wait_us) {
                         session->wait_us = 0;
                     }
@@ -2281,8 +2242,10 @@ apr_status_t h2_session_process(h2_sessi
                 }
                 else if ((apr_time_now() - session->start_wait) >= session->s->timeout) {
                     /* waited long enough */
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, APR_TIMEUP, c,
-                                  "h2_session: wait for data");
+                    if (trace) {
+                        ap_log_cerror(APLOG_MARK, APLOG_TRACE3, APR_TIMEUP, c,
+                                      "h2_session: wait for data");
+                    }
                     dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, "timeout");
                     break;
                 }
@@ -2291,8 +2254,8 @@ apr_status_t h2_session_process(h2_sessi
                     session->wait_us = H2MIN(session->wait_us*2, MAX_WAIT_MICROS);
                 }
 
-                if (APLOGctrace1(c)) {
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+                if (trace) {
+                    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, c,
                                   "h2_session: wait for data, %ld micros", 
                                   (long)session->wait_us);
                 }
@@ -2343,9 +2306,11 @@ apr_status_t h2_session_process(h2_sessi
     }
     
 out:
-    ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
-                  "h2_session(%ld): [%s] process returns", 
-                  session->id, state_name(session->state));
+    if (trace) {
+        ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+                      "h2_session(%ld): [%s] process returns", 
+                      session->id, state_name(session->state));
+    }
     
     if ((session->state != H2_SESSION_ST_DONE)
         && (APR_STATUS_IS_EOF(status)

Modified: httpd/httpd/trunk/modules/http2/h2_session.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_session.h?rev=1744712&r1=1744711&r2=1744712&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_session.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_session.h Fri May 20 11:18:37 2016
@@ -98,12 +98,13 @@ typedef struct h2_session {
     unsigned int reprioritize  : 1; /* scheduled streams priority changed */
     unsigned int eoc_written   : 1; /* h2 eoc bucket written */
     unsigned int flush         : 1; /* flushing output necessary */
+    unsigned int have_read     : 1; /* session has read client data */
+    unsigned int have_written  : 1; /* session did write data to client */
     apr_interval_time_t  wait_us;   /* timout during BUSY_WAIT state, micro secs */
     
     struct h2_push_diary *push_diary; /* remember pushes, avoid duplicates */
     
     int open_streams;               /* number of streams open */
-    int unanswered_streams;         /* number of streams waiting for response */
     int unsent_submits;             /* number of submitted, but not yet written responses. */
     int unsent_promises;            /* number of submitted, but not yet written push promised */
                                          

Modified: httpd/httpd/trunk/modules/http2/h2_stream.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_stream.c?rev=1744712&r1=1744711&r2=1744712&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_stream.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_stream.c Fri May 20 11:18:37 2016
@@ -594,6 +594,11 @@ apr_status_t h2_stream_out_prepare(h2_st
             APR_BRIGADE_INSERT_TAIL(stream->buffer, eos);
             status = APR_SUCCESS;
         }
+        else if (status == APR_EAGAIN) {
+            /* did not receive more, it's ok */
+            status = APR_SUCCESS;
+        }
+        *plen = requested;
         h2_util_bb_avail(stream->buffer, plen, peos);
     }
     H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_post");

Modified: httpd/httpd/trunk/modules/http2/h2_stream.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_stream.h?rev=1744712&r1=1744711&r2=1744712&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_stream.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_stream.h Fri May 20 11:18:37 2016
@@ -61,6 +61,7 @@ struct h2_stream {
     unsigned int aborted   : 1; /* was aborted */
     unsigned int suspended : 1; /* DATA sending has been suspended */
     unsigned int scheduled : 1; /* stream has been scheduled */
+    unsigned int started   : 1; /* stream has started processing */
     unsigned int submitted : 1; /* response HEADER has been sent */
     
     apr_off_t input_remaining;  /* remaining bytes on input as advertised via content-length */

Modified: httpd/httpd/trunk/modules/http2/h2_task.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task.c?rev=1744712&r1=1744711&r2=1744712&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_task.c Fri May 20 11:18:37 2016
@@ -59,13 +59,39 @@ static int input_ser_header(void *ctx, c
     return 1;
 }
 
-static apr_status_t input_append_eos(h2_task *task, request_rec *r)
+static void make_chunk(h2_task *task, apr_bucket_brigade *bb, 
+                       apr_bucket *first, apr_uint64_t chunk_len, 
+                       apr_bucket *tail)
+{
+    /* Surround the buckets [first, tail[ with new buckets carrying the
+     * HTTP/1.1 chunked encoding format. If tail is NULL, the chunk extends
+     * to the end of the brigade. */
+    char buffer[128];
+    apr_bucket *c;
+    int len;
+    
+    len = apr_snprintf(buffer, H2_ALEN(buffer), 
+                       "%"APR_UINT64_T_HEX_FMT"\r\n", chunk_len);
+    c = apr_bucket_heap_create(buffer, len, NULL, bb->bucket_alloc);
+    APR_BUCKET_INSERT_BEFORE(first, c);
+    c = apr_bucket_heap_create("\r\n", 2, NULL, bb->bucket_alloc);
+    if (tail) {
+        APR_BUCKET_INSERT_BEFORE(tail, c);
+    }
+    else {
+        APR_BRIGADE_INSERT_TAIL(bb, c);
+    }
+}
+
+static apr_status_t input_handle_eos(h2_task *task, request_rec *r, 
+                                     apr_bucket *b)
 {
     apr_status_t status = APR_SUCCESS;
     apr_bucket_brigade *bb = task->input.bb;
     apr_table_t *t = task->request->trailers;
 
     if (task->input.chunked) {
+        task->input.tmp = apr_brigade_split_ex(bb, b, task->input.tmp);
         if (t && !apr_is_empty_table(t)) {
             status = apr_brigade_puts(bb, NULL, NULL, "0\r\n");
             apr_table_do(input_ser_header, task, t, NULL);
@@ -74,13 +100,38 @@ static apr_status_t input_append_eos(h2_
         else {
             status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n");
         }
+        APR_BRIGADE_CONCAT(bb, task->input.tmp);
     }
     else if (r && t && !apr_is_empty_table(t)){
         /* trailers passed in directly. */
         apr_table_overlap(r->trailers_in, t, APR_OVERLAP_TABLES_SET);
     }
     task->input.eos_written = 1;
+    return status;
+}
+
+static apr_status_t input_append_eos(h2_task *task, request_rec *r)
+{
+    apr_status_t status = APR_SUCCESS;
+    apr_bucket_brigade *bb = task->input.bb;
+    apr_table_t *t = task->request->trailers;
+
+    if (task->input.chunked) {
+        if (t && !apr_is_empty_table(t)) {
+            status = apr_brigade_puts(bb, NULL, NULL, "0\r\n");
+            apr_table_do(input_ser_header, task, t, NULL);
+            status = apr_brigade_puts(bb, NULL, NULL, "\r\n");
+        }
+        else {
+            status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n");
+        }
+    }
+    else if (r && t && !apr_is_empty_table(t)){
+        /* trailers passed in directly. */
+        apr_table_overlap(r->trailers_in, t, APR_OVERLAP_TABLES_SET);
+    }
     APR_BRIGADE_INSERT_TAIL(bb, apr_bucket_eos_create(bb->bucket_alloc));
+    task->input.eos_written = 1;
     return status;
 }
 
@@ -89,7 +140,7 @@ static apr_status_t input_read(h2_task *
                                apr_read_type_e block, apr_off_t readbytes)
 {
     apr_status_t status = APR_SUCCESS;
-    apr_bucket *b, *next;
+    apr_bucket *b, *next, *first_data;
     apr_off_t bblen = 0;
     
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
@@ -104,31 +155,28 @@ static apr_status_t input_read(h2_task *
         return APR_ECONNABORTED;
     }
     
-    if (task->input.bb) {
-        /* Cleanup brigades from those nasty 0 length non-meta buckets
-         * that apr_brigade_split_line() sometimes produces. */
-        for (b = APR_BRIGADE_FIRST(task->input.bb); 
-             b != APR_BRIGADE_SENTINEL(task->input.bb); b = next) {
-            next = APR_BUCKET_NEXT(b);
-            if (b->length == 0 && !APR_BUCKET_IS_METADATA(b)) {
-                apr_bucket_delete(b);
-            } 
+    if (!task->input.bb) {
+        if (!task->input.eos_written) {
+            input_append_eos(task, f->r);
         }
-        apr_brigade_length(task->input.bb, 0, &bblen);
+        return APR_EOF;
     }
     
-    if (bblen == 0) {
-        if (task->input.eos_written) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, APR_EOF, f->c,
-                          "h2_task(%s): read no data", task->id); 
-            return APR_EOF;
-        }
-        else if (task->input.eos) {
-            input_append_eos(task, f->r);
-        }
+    /* Cleanup brigades from those nasty 0 length non-meta buckets
+     * that apr_brigade_split_line() sometimes produces. */
+    for (b = APR_BRIGADE_FIRST(task->input.bb);
+         b != APR_BRIGADE_SENTINEL(task->input.bb); b = next) {
+        next = APR_BUCKET_NEXT(b);
+        if (b->length == 0 && !APR_BUCKET_IS_METADATA(b)) {
+            apr_bucket_delete(b);
+        } 
     }
     
     while (APR_BRIGADE_EMPTY(task->input.bb)) {
+        if (task->input.eos_written) {
+            return APR_EOF;
+        }
+        
         /* Get more input data for our request. */
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
                       "h2_task(%s): get more data from mplx, block=%d, "
@@ -161,34 +209,52 @@ static apr_status_t input_read(h2_task *
             return status;
         }
         
-        apr_brigade_length(task->input.bb, 0, &bblen);
-        if (bblen > 0 && task->input.chunked) {
-            /* need to add chunks since request processing expects it */
-            char buffer[128];
-            apr_bucket *b;
-            int len;
-            
-            len = apr_snprintf(buffer, H2_ALEN(buffer), "%lx\r\n", 
-                               (unsigned long)bblen);
-            b = apr_bucket_heap_create(buffer, len, NULL, 
-                                       task->input.bb->bucket_alloc);
-            APR_BRIGADE_INSERT_HEAD(task->input.bb, b);
-            status = apr_brigade_puts(task->input.bb, NULL, NULL, "\r\n");
-        }
-        
-        if (h2_util_has_eos(task->input.bb, -1)) {
-            task->input.eos = 1;
-        }
-        
-        if (task->input.eos && !task->input.eos_written) {
-            input_append_eos(task, f->r);
-        }
+        /* Inspect the buckets received, detect EOS and apply
+         * chunked encoding if necessary */
+        h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, 
+                       "input.beam recv raw", task->input.bb);
+        first_data = NULL;
+        bblen = 0;
+        for (b = APR_BRIGADE_FIRST(task->input.bb); 
+             b != APR_BRIGADE_SENTINEL(task->input.bb); b = next) {
+            next = APR_BUCKET_NEXT(b);
+            if (APR_BUCKET_IS_METADATA(b)) {
+                if (first_data && task->input.chunked) {
+                    make_chunk(task, task->input.bb, first_data, bblen, b);
+                    first_data = NULL;
+                    bblen = 0;
+                }
+                if (APR_BUCKET_IS_EOS(b)) {
+                    task->input.eos = 1;
+                    input_handle_eos(task, f->r, b);
+                    h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, 
+                                   "input.bb after handle eos", 
+                                   task->input.bb);
+                }
+            }
+            else if (b->length == 0) {
+                apr_bucket_delete(b);
+            } 
+            else {
+                if (!first_data) {
+                    first_data = b;
+                }
+                bblen += b->length;
+            }    
+        }
+        if (first_data && task->input.chunked) {
+            make_chunk(task, task->input.bb, first_data, bblen, NULL);
+        }            
         
         if (h2_task_logio_add_bytes_in) {
             h2_task_logio_add_bytes_in(f->c, bblen);
         }
     }
     
+    if (!task->input.eos_written && task->input.eos) {
+        input_append_eos(task, f->r);
+    }
+
     h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, 
                    "task_input.bb", task->input.bb);
            

Modified: httpd/httpd/trunk/modules/http2/h2_task.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task.h?rev=1744712&r1=1744711&r2=1744712&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_task.h Fri May 20 11:18:37 2016
@@ -61,6 +61,7 @@ struct h2_task {
     struct {
         struct h2_bucket_beam *beam;
         apr_bucket_brigade *bb;
+        apr_bucket_brigade *tmp;
         apr_read_type_e block;
         unsigned int chunked : 1;
         unsigned int eos : 1;

Modified: httpd/httpd/trunk/modules/http2/h2_util.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_util.c?rev=1744712&r1=1744711&r2=1744712&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_util.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_util.c Fri May 20 11:18:37 2016
@@ -682,11 +682,6 @@ static apr_status_t last_not_included(ap
                 /* included */
             }
             else {
-                if (maxlen == 0) {
-                    *pend = b;
-                    return status;
-                }
-                
                 if (b->length == ((apr_size_t)-1)) {
                     const char *ign;
                     apr_size_t ilen;
@@ -696,6 +691,11 @@ static apr_status_t last_not_included(ap
                     }
                 }
                 
+                if (maxlen == 0 && b->length > 0) {
+                    *pend = b;
+                    return status;
+                }
+                
                 if (same_alloc && APR_BUCKET_IS_FILE(b)) {
                     /* we like it move it, always */
                 }
@@ -829,20 +829,6 @@ int h2_util_has_eos(apr_bucket_brigade *
             return 1;
         }
     }
-    return 0;
-}
-
-int h2_util_bb_has_data(apr_bucket_brigade *bb)
-{
-    apr_bucket *b;
-    for (b = APR_BRIGADE_FIRST(bb);
-         b != APR_BRIGADE_SENTINEL(bb);
-         b = APR_BUCKET_NEXT(b))
-    {
-        if (!AP_BUCKET_IS_EOR(b)) {
-            return 1;
-        }
-    }
     return 0;
 }
 

Modified: httpd/httpd/trunk/modules/http2/h2_util.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_util.h?rev=1744712&r1=1744711&r2=1744712&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_util.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_util.h Fri May 20 11:18:37 2016
@@ -321,7 +321,6 @@ apr_status_t h2_brigade_copy_length(apr_
  * @return != 0 iff brigade holds FLUSH or EOS bucket (or both)
  */
 int h2_util_has_eos(apr_bucket_brigade *bb, apr_off_t len);
-int h2_util_bb_has_data(apr_bucket_brigade *bb);
 
 /**
  * Check how many bytes of the desired amount are available and if the



Mime
View raw message