httpd-cvs mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ic...@apache.org
Subject svn commit: r1735230 - in /httpd/httpd/trunk: ./ modules/http2/
Date Wed, 16 Mar 2016 14:01:53 GMT
Author: icing
Date: Wed Mar 16 14:01:53 2016
New Revision: 1735230

URL: http://svn.apache.org/viewvc?rev=1735230&view=rev
Log:
mod_http2: fix for bucket lifetime on master conn, mod_proxy_http2: flow control from front- to backend h2 connection

Modified:
    httpd/httpd/trunk/CHANGES
    httpd/httpd/trunk/modules/http2/h2_bucket_eoc.h
    httpd/httpd/trunk/modules/http2/h2_bucket_eos.h
    httpd/httpd/trunk/modules/http2/h2_conn_io.c
    httpd/httpd/trunk/modules/http2/h2_io.c
    httpd/httpd/trunk/modules/http2/h2_mplx.c
    httpd/httpd/trunk/modules/http2/h2_mplx.h
    httpd/httpd/trunk/modules/http2/h2_ngn_shed.c
    httpd/httpd/trunk/modules/http2/h2_ngn_shed.h
    httpd/httpd/trunk/modules/http2/h2_proxy_session.c
    httpd/httpd/trunk/modules/http2/h2_proxy_session.h
    httpd/httpd/trunk/modules/http2/h2_session.c
    httpd/httpd/trunk/modules/http2/h2_session.h
    httpd/httpd/trunk/modules/http2/h2_task.c
    httpd/httpd/trunk/modules/http2/h2_task.h
    httpd/httpd/trunk/modules/http2/h2_task_output.c
    httpd/httpd/trunk/modules/http2/h2_util.c
    httpd/httpd/trunk/modules/http2/h2_util.h
    httpd/httpd/trunk/modules/http2/mod_http2.c
    httpd/httpd/trunk/modules/http2/mod_http2.h
    httpd/httpd/trunk/modules/http2/mod_proxy_http2.c

Modified: httpd/httpd/trunk/CHANGES
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/CHANGES?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/CHANGES [utf-8] (original)
+++ httpd/httpd/trunk/CHANGES [utf-8] Wed Mar 16 14:01:53 2016
@@ -1,6 +1,13 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache 2.5.0
 
+  *) mod_proxy_http2: using HTTP/2 flow control for backend streams by 
+     observing data actually send out on the frontend h2 connection. 
+     [Stefan Eissing]
+     
+  *) mod_http2: fixes problem with wrong lifetime of file buckets on main
+     connection. [Stefan Eissing]
+     
   *) mpm: Generalise the ap_mpm_register_socket functions to accept pipes
      or sockets. [Graham Leggett]
 

Modified: httpd/httpd/trunk/modules/http2/h2_bucket_eoc.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_bucket_eoc.h?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_bucket_eoc.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_bucket_eoc.h Wed Mar 16 14:01:53 2016
@@ -21,6 +21,7 @@ struct h2_session;
 /** End Of HTTP/2 SESSION (H2EOC) bucket */
 extern const apr_bucket_type_t h2_bucket_type_eoc;
 
+#define H2_BUCKET_IS_H2EOC(e)     (e->type == &h2_bucket_type_eoc)
 
 apr_bucket * h2_bucket_eoc_make(apr_bucket *b, 
                                 struct h2_session *session);

Modified: httpd/httpd/trunk/modules/http2/h2_bucket_eos.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_bucket_eos.h?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_bucket_eos.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_bucket_eos.h Wed Mar 16 14:01:53 2016
@@ -21,6 +21,7 @@ struct h2_stream;
 /** End Of HTTP/2 STREAM (H2EOS) bucket */
 extern const apr_bucket_type_t h2_bucket_type_eos;
 
+#define H2_BUCKET_IS_H2EOS(e)     (e->type == &h2_bucket_type_eos)
 
 apr_bucket *h2_bucket_eos_make(apr_bucket *b, struct h2_stream *stream);
 

Modified: httpd/httpd/trunk/modules/http2/h2_conn_io.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_conn_io.c?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_conn_io.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_conn_io.c Wed Mar 16 14:01:53 2016
@@ -14,16 +14,18 @@
  */
 
 #include <assert.h>
-
+#include <apr_strings.h>
 #include <ap_mpm.h>
 
 #include <httpd.h>
 #include <http_core.h>
 #include <http_log.h>
 #include <http_connection.h>
+#include <http_request.h>
 
 #include "h2_private.h"
 #include "h2_bucket_eoc.h"
+#include "h2_bucket_eos.h"
 #include "h2_config.h"
 #include "h2_conn_io.h"
 #include "h2_h2.h"
@@ -46,6 +48,84 @@
 #define WRITE_BUFFER_SIZE     (5*WRITE_SIZE_MAX)
 
 
+static void h2_conn_io_bb_log(conn_rec *c, int stream_id, int level, 
+                              const char *tag, apr_bucket_brigade *bb)
+{
+    char buffer[16 * 1024];
+    const char *line = "(null)";
+    apr_size_t bmax = sizeof(buffer)/sizeof(buffer[0]);
+    int off = 0;
+    apr_bucket *b;
+    
+    if (bb) {
+        memset(buffer, 0, bmax--);
+        for (b = APR_BRIGADE_FIRST(bb); 
+             bmax && (b != APR_BRIGADE_SENTINEL(bb));
+             b = APR_BUCKET_NEXT(b)) {
+            
+            if (APR_BUCKET_IS_METADATA(b)) {
+                if (APR_BUCKET_IS_EOS(b)) {
+                    off += apr_snprintf(buffer+off, bmax-off, "eos ");
+                }
+                else if (APR_BUCKET_IS_FLUSH(b)) {
+                    off += apr_snprintf(buffer+off, bmax-off, "flush ");
+                }
+                else if (AP_BUCKET_IS_EOR(b)) {
+                    off += apr_snprintf(buffer+off, bmax-off, "eor ");
+                }
+                else if (H2_BUCKET_IS_H2EOC(b)) {
+                    off += apr_snprintf(buffer+off, bmax-off, "h2eoc ");
+                }
+                else if (H2_BUCKET_IS_H2EOS(b)) {
+                    off += apr_snprintf(buffer+off, bmax-off, "h2eos ");
+                }
+                else {
+                    off += apr_snprintf(buffer+off, bmax-off, "meta(unknown) ");
+                }
+            }
+            else {
+                const char *btype = "data";
+                if (APR_BUCKET_IS_FILE(b)) {
+                    btype = "file";
+                }
+                else if (APR_BUCKET_IS_PIPE(b)) {
+                    btype = "pipe";
+                }
+                else if (APR_BUCKET_IS_SOCKET(b)) {
+                    btype = "socket";
+                }
+                else if (APR_BUCKET_IS_HEAP(b)) {
+                    btype = "heap";
+                }
+                else if (APR_BUCKET_IS_TRANSIENT(b)) {
+                    btype = "transient";
+                }
+                else if (APR_BUCKET_IS_IMMORTAL(b)) {
+                    btype = "immortal";
+                }
+#if APR_HAS_MMAP
+                else if (APR_BUCKET_IS_MMAP(b)) {
+                    btype = "mmap";
+                }
+#endif
+                else if (APR_BUCKET_IS_POOL(b)) {
+                    btype = "pool";
+                }
+                
+                off += apr_snprintf(buffer+off, bmax-off, "%s[%ld] ", 
+                                    btype, 
+                                    (long)(b->length == ((apr_size_t)-1)? 
+                                           -1 : b->length));
+            }
+        }
+        line = *buffer? buffer : "(empty)";
+    }
+    /* Intentional no APLOGNO */
+    ap_log_cerror(APLOG_MARK, level, 0, c, "bb_dump(%ld-%d)-%s: %s", 
+                  c->id, stream_id, tag, line);
+
+}
+
 apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, 
                              const h2_config *cfg, 
                              apr_pool_t *pool)
@@ -112,16 +192,17 @@ static apr_status_t pass_out(apr_bucket_
     }
     
     ap_update_child_status_from_conn(c->sbh, SERVER_BUSY_WRITE, c);
-    status = apr_brigade_length(bb, 0, &bblen);
-    if (status == APR_SUCCESS) {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03044)
+    apr_brigade_length(bb, 0, &bblen);
+    h2_conn_io_bb_log(c, 0, APLOG_TRACE2, "master conn pass", bb);
+    status = ap_pass_brigade(c->output_filters, bb);
+    if (status == APR_SUCCESS && pctx->io) {
+        pctx->io->bytes_written += (apr_size_t)bblen;
+        pctx->io->last_write = apr_time_now();
+    }
+    if (status != APR_SUCCESS) {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, APLOGNO(03044)
                       "h2_conn_io(%ld): pass_out brigade %ld bytes",
                       c->id, (long)bblen);
-        status = ap_pass_brigade(c->output_filters, bb);
-        if (status == APR_SUCCESS && pctx->io) {
-            pctx->io->bytes_written += (apr_size_t)bblen;
-            pctx->io->last_write = apr_time_now();
-        }
     }
     apr_brigade_cleanup(bb);
     return status;
@@ -179,9 +260,10 @@ apr_status_t h2_conn_io_writeb(h2_conn_i
     return APR_SUCCESS;
 }
 
-static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int force, int eoc)
+static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int flush, int eoc)
 {
     pass_out_ctx ctx;
+    apr_bucket *b;
     
     if (io->buflen == 0 && APR_BRIGADE_EMPTY(io->output)) {
         return APR_SUCCESS;
@@ -195,13 +277,12 @@ static apr_status_t h2_conn_io_flush_int
         bucketeer_buffer(io);
     }
     
-    if (force) {
-        apr_bucket *b = apr_bucket_flush_create(io->c->bucket_alloc);
+    if (flush) {
+        b = apr_bucket_flush_create(io->c->bucket_alloc);
         APR_BRIGADE_INSERT_TAIL(io->output, b);
     }
     
     ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, "h2_conn_io: flush");
-    /* Send it out */
     io->buflen = 0;
     ctx.c = io->c;
     ctx.io = eoc? NULL : io;
@@ -221,11 +302,11 @@ apr_status_t h2_conn_io_consider_pass(h2
     apr_off_t len = 0;
     
     if (!APR_BRIGADE_EMPTY(io->output)) {
-        apr_brigade_length(io->output, 0, &len);
+        len = h2_brigade_mem_size(io->output);
     }
     len += io->buflen;
     if (len >= WRITE_BUFFER_SIZE) {
-        return h2_conn_io_flush_int(io, 0, 0);
+        return h2_conn_io_flush_int(io, 1, 0);
     }
     return APR_SUCCESS;
 }
@@ -256,7 +337,7 @@ apr_status_t h2_conn_io_write(h2_conn_io
         while (length > 0 && (status == APR_SUCCESS)) {
             apr_size_t avail = io->bufsize - io->buflen;
             if (avail <= 0) {
-                h2_conn_io_flush_int(io, 0, 0);
+                status = h2_conn_io_flush_int(io, 0, 0);
             }
             else if (length > avail) {
                 memcpy(io->buffer + io->buflen, buf, avail);

Modified: httpd/httpd/trunk/modules/http2/h2_io.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_io.c?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_io.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_io.c Wed Mar 16 14:01:53 2016
@@ -397,8 +397,10 @@ apr_status_t h2_io_out_read_to(h2_io *io
     if (!is_out_readable(io, plen, peos, &status)) {
         return status;
     }
-    io->eos_out_read = *peos = h2_util_has_eos(io->bbout, *plen);
     status = h2_util_move(bb, io->bbout, *plen, NULL, "h2_io_read_to");
+    if (status == APR_SUCCESS && io->eos_out && APR_BRIGADE_EMPTY(io->bbout)) {
+        io->eos_out_read = *peos = 1;
+    }
     io->output_consumed += *plen;
     return status;
 }
@@ -423,21 +425,6 @@ apr_status_t h2_io_out_write(h2_io *io,
         return APR_ECONNABORTED;
     }
 
-    if (!io->eor) {
-        /* Filter the EOR bucket and set it aside. We prefer to tear down
-         * the request when the whole h2 stream is done */
-        for (b = APR_BRIGADE_FIRST(bb);
-             b != APR_BRIGADE_SENTINEL(bb);
-             b = APR_BUCKET_NEXT(b))
-        {
-            if (AP_BUCKET_IS_EOR(b)) {
-                APR_BUCKET_REMOVE(b);
-                io->eor = b;
-                break;
-            }
-        }     
-    }
-    
     if (io->eos_out) {
         apr_off_t len = 0;
         /* We have already delivered an EOS bucket to a reader, no
@@ -448,6 +435,23 @@ apr_status_t h2_io_out_write(h2_io *io,
         return (len > 0)? APR_EOF : APR_SUCCESS;
     }
 
+    /* Filter the EOR bucket and set it aside. We prefer to tear down
+     * the request when the whole h2 stream is done */
+    for (b = APR_BRIGADE_FIRST(bb);
+         b != APR_BRIGADE_SENTINEL(bb);
+         b = APR_BUCKET_NEXT(b))
+    {
+        if (AP_BUCKET_IS_EOR(b)) {
+            APR_BUCKET_REMOVE(b);
+            io->eor = b;
+            break;
+        }
+        else if (APR_BUCKET_IS_EOS(b)) {
+            io->eos_out = 1;
+            break;
+        }
+    }     
+    
     process_trailers(io, trailers);
     
     /* Let's move the buckets from the request processing in here, so

Modified: httpd/httpd/trunk/modules/http2/h2_mplx.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.c?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.c Wed Mar 16 14:01:53 2016
@@ -195,7 +195,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
             return NULL;
         }
         
-        status = apr_thread_cond_create(&m->req_added, m->pool);
+        status = apr_thread_cond_create(&m->task_thawed, m->pool);
         if (status != APR_SUCCESS) {
             h2_mplx_destroy(m);
             return NULL;
@@ -254,7 +254,7 @@ static void workers_register(h2_mplx *m)
     h2_workers_register(m->workers, m);
 }
 
-static int io_process_events(h2_mplx *m, h2_io *io)
+static int io_in_consumed_signal(h2_mplx *m, h2_io *io)
 {
     if (io->input_consumed && m->input_consumed) {
         m->input_consumed(m->input_consumed_ctx, 
@@ -265,6 +265,17 @@ static int io_process_events(h2_mplx *m,
     return 0;
 }
 
+static int io_out_consumed_signal(h2_mplx *m, h2_io *io)
+{
+    if (io->output_consumed && io->task && io->task->assigned) {
+        h2_req_engine_out_consumed(io->task->assigned, io->task->c, 
+                                   io->output_consumed);
+        io->output_consumed = 0;
+        return 1;
+    }
+    return 0;
+}
+
 static void io_destroy(h2_mplx *m, h2_io *io, int events)
 {
     apr_pool_t *pool;
@@ -273,7 +284,7 @@ static void io_destroy(h2_mplx *m, h2_io
     h2_io_in_shutdown(io);
     if (events) {
         /* Process outstanding events before destruction */
-        io_process_events(m, io);
+        io_in_consumed_signal(m, io);
     }
     
     /* The pool is cleared/destroyed which also closes all
@@ -299,7 +310,7 @@ static void io_destroy(h2_mplx *m, h2_io
 
     pool = io->pool;
     io->pool = NULL;    
-    if (pool) {
+    if (0 && pool) {
         apr_pool_clear(pool);
         if (m->spare_pool) {
             apr_pool_destroy(m->spare_pool);
@@ -377,7 +388,7 @@ apr_status_t h2_mplx_release_and_join(h2
         h2_mplx_set_consumed_cb(m, NULL, NULL);
         
         h2_iq_clear(m->q);
-        apr_thread_cond_broadcast(m->req_added);
+        apr_thread_cond_broadcast(m->task_thawed);
         while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) {
             /* iterate until all ios have been orphaned or destroyed */
         }
@@ -413,7 +424,7 @@ apr_status_t h2_mplx_release_and_join(h2
                     }
                 }
                 h2_mplx_abort(m);
-                apr_thread_cond_broadcast(m->req_added);
+                apr_thread_cond_broadcast(m->task_thawed);
             }
         }
         
@@ -460,7 +471,7 @@ apr_status_t h2_mplx_stream_done(h2_mplx
          * for processing, e.g. when we received all HEADERs. But when
          * a stream is cancelled very early, it will not exist. */
         if (io) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, 
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
                           "h2_mplx(%ld-%d): marking stream as done.", 
                           m->id, stream_id);
             io_stream_done(m, io, rst_error);
@@ -523,7 +534,7 @@ apr_status_t h2_mplx_in_write(h2_mplx *m
             status = h2_io_in_write(io, data, len, eos);
             H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_post");
             h2_io_signal(io, H2_IO_READ);
-            io_process_events(m, io);
+            io_in_consumed_signal(m, io);
         }
         else {
             status = APR_ECONNABORTED;
@@ -545,7 +556,7 @@ apr_status_t h2_mplx_in_close(h2_mplx *m
             status = h2_io_in_close(io);
             H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_close");
             h2_io_signal(io, H2_IO_READ);
-            io_process_events(m, io);
+            io_in_consumed_signal(m, io);
         }
         else {
             status = APR_ECONNABORTED;
@@ -555,6 +566,12 @@ apr_status_t h2_mplx_in_close(h2_mplx *m
     return status;
 }
 
+void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
+{
+    m->input_consumed = cb;
+    m->input_consumed_ctx = ctx;
+}
+
 typedef struct {
     h2_mplx * m;
     int streams_updated;
@@ -563,18 +580,12 @@ typedef struct {
 static int update_window(void *ctx, h2_io *io)
 {
     update_ctx *uctx = (update_ctx*)ctx;
-    if (io_process_events(uctx->m, io)) {
+    if (io_in_consumed_signal(uctx->m, io)) {
         ++uctx->streams_updated;
     }
     return 1;
 }
 
-void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
-{
-    m->input_consumed = cb;
-    m->input_consumed_ctx = ctx;
-}
-
 apr_status_t h2_mplx_in_update_windows(h2_mplx *m)
 {
     apr_status_t status;
@@ -702,7 +713,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *
                      * shutdown input and send out any events (e.g. window
                      * updates) asap. */
                     h2_io_in_shutdown(io);
-                    io_process_events(m, io);
+                    io_in_consumed_signal(m, io);
                 }
             }
             
@@ -729,8 +740,10 @@ static apr_status_t out_write(h2_mplx *m
            && !APR_BRIGADE_EMPTY(bb) 
            && !is_aborted(m, &status)) {
         
-        status = h2_io_out_write(io, bb, m->stream_max_mem, trailers,
-                                 &m->tx_handles_reserved);
+        status = h2_io_out_write(io, bb, blocking? m->stream_max_mem : INT_MAX, 
+                                 trailers, &m->tx_handles_reserved);
+        io_out_consumed_signal(m, io);
+        
         /* Wait for data to drain until there is room again or
          * stream timeout expires */
         h2_io_signal_init(io, H2_IO_WRITE, m->stream_timeout, iowait);
@@ -740,6 +753,9 @@ static apr_status_t out_write(h2_mplx *m
                && (m->stream_max_mem <= h2_io_out_length(io))
                && !is_aborted(m, &status)) {
             if (!blocking) {
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
+                              "h2_mplx(%ld-%d): incomplete write", 
+                              m->id, io->id);
                 return APR_INCOMPLETE;
             }
             trailers = NULL;
@@ -874,11 +890,8 @@ apr_status_t h2_mplx_out_close(h2_mplx *
                           trailers? "yes" : "no");
             status = h2_io_out_close(io, trailers);
             H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close");
+            io_out_consumed_signal(m, io);
             
-            if (io->eor) {
-                apr_bucket_delete(io->eor);
-                io->eor = NULL;
-            }
             have_out_data_for(m, stream_id);
         }
         else {
@@ -1061,7 +1074,7 @@ static h2_task *pop_task(h2_mplx *m)
             }
         }
         else if (io) {
-            conn_rec *slave = h2_slave_create(m->c, io->pool, m->spare_allocator);
+            conn_rec *slave = h2_slave_create(m->c, m->pool, m->spare_allocator);
             m->spare_allocator = NULL;
             io->task = task = h2_task_create(m->id, io->request, slave, m);
             apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id);
@@ -1100,7 +1113,7 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, in
     return task;
 }
 
-static void task_done(h2_mplx *m, h2_task *task)
+static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
 {
     if (task) {
         if (task->frozen) {
@@ -1112,7 +1125,7 @@ static void task_done(h2_mplx *m, h2_tas
              * bodies into the mplx. */
             /* FIXME: this implementation is incomplete. */
             h2_task_set_io_blocking(task, 0);
-            apr_thread_cond_broadcast(m->req_added);
+            apr_thread_cond_broadcast(m->task_thawed);
         }
         else {
             h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
@@ -1126,6 +1139,18 @@ static void task_done(h2_mplx *m, h2_tas
              * other mplx's. Perhaps leave after n requests? */
             h2_mplx_out_close(m, task->stream_id, NULL);
             
+            if (ngn && io) {
+                apr_off_t bytes = io->output_consumed + h2_io_out_length(io);
+                if (bytes > 0) {
+                    /* we need to report consumed and current buffered output
+                     * to the engine. The request will be streamed out or cancelled,
+                     * no more data is coming from it and the engine should update
+                     * its calculations before we destroy this information. */
+                    h2_req_engine_out_consumed(ngn, task->c, bytes);
+                    io->output_consumed = 0;
+                }
+            }
+            
             if (task->engine) {
                 if (!h2_req_engine_is_shutdown(task->engine)) {
                     ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
@@ -1194,7 +1219,7 @@ void h2_mplx_task_done(h2_mplx *m, h2_ta
     int acquired;
     
     if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-        task_done(m, task);
+        task_done(m, task, NULL);
         --m->workers_busy;
         if (ptask) {
             /* caller wants another task */
@@ -1347,8 +1372,37 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
  * HTTP/2 request engines
  ******************************************************************************/
 
+typedef struct {
+    h2_mplx * m;
+    h2_req_engine *ngn;
+    int streams_updated;
+} ngn_update_ctx;
+
+static int ngn_update_window(void *ctx, h2_io *io)
+{
+    ngn_update_ctx *uctx = ctx;
+    if (io && io->task && io->task->assigned == uctx->ngn
+        && io_out_consumed_signal(uctx->m, io)) {
+        ++uctx->streams_updated;
+    }
+    return 1;
+}
+
+static apr_status_t ngn_out_update_windows(h2_mplx *m, h2_req_engine *ngn)
+{
+    ngn_update_ctx ctx;
+        
+    ctx.m = m;
+    ctx.ngn = ngn;
+    ctx.streams_updated = 0;
+    h2_io_set_iter(m->stream_ios, ngn_update_window, &ctx);
+    
+    return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN;
+}
+
 apr_status_t h2_mplx_req_engine_push(const char *ngn_type, 
-                                     request_rec *r, h2_req_engine_init *einit)
+                                     request_rec *r,
+                                     http2_req_engine_init *einit)
 {
     apr_status_t status;
     h2_mplx *m;
@@ -1360,6 +1414,7 @@ apr_status_t h2_mplx_req_engine_push(con
         return APR_ECONNABORTED;
     }
     m = task->mplx;
+    task->r = r;
     
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
@@ -1367,8 +1422,7 @@ apr_status_t h2_mplx_req_engine_push(con
             status = APR_ECONNABORTED;
         }
         else {
-            status = h2_ngn_shed_push_req(m->ngn_shed, ngn_type, 
-                                          task, r, einit);
+            status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit);
         }
         leave_mutex(m, acquired);
     }
@@ -1383,30 +1437,37 @@ apr_status_t h2_mplx_req_engine_pull(h2_
     h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
     h2_mplx *m = h2_ngn_shed_get_ctx(shed);
     apr_status_t status;
+    h2_task *task = NULL;
     int acquired;
     
-    *pr = NULL;
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         int want_shutdown = (block == APR_BLOCK_READ);
+
+        /* Take this opportunity to update output consummation 
+         * for this engine */
+        ngn_out_update_windows(m, ngn);
+        
         if (want_shutdown && !h2_iq_empty(m->q)) {
             /* For a blocking read, check first if requests are to be
              * had and, if not, wait a short while before doing the
              * blocking, and if unsuccessful, terminating read.
              */
-            status = h2_ngn_shed_pull_req(shed, ngn, capacity, 1, pr);
+            status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task);
             if (APR_STATUS_IS_EAGAIN(status)) {
                 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                               "h2_mplx(%ld): start block engine pull", m->id);
-                apr_thread_cond_timedwait(m->req_added, m->lock, 
+                apr_thread_cond_timedwait(m->task_thawed, m->lock, 
                                           apr_time_from_msec(20));
-                status = h2_ngn_shed_pull_req(shed, ngn, capacity, 1, pr);
+                status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task);
             }
         }
         else {
-            status = h2_ngn_shed_pull_req(shed, ngn, capacity, want_shutdown, pr);
+            status = h2_ngn_shed_pull_task(shed, ngn, capacity,
+                                           want_shutdown, &task);
         }
         leave_mutex(m, acquired);
     }
+    *pr = task? task->r : NULL;
     return status;
 }
  
@@ -1419,13 +1480,16 @@ void h2_mplx_req_engine_done(h2_req_engi
         int acquired;
 
         if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+            ngn_out_update_windows(m, ngn);
             h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
             if (task->engine) { 
                 /* cannot report that as done until engine returns */
             }
             else {
-                task_done(m, task);
+                task_done(m, task, ngn);
             }
+            /* Take this opportunity to update output consummation 
+             * for this engine */
             leave_mutex(m, acquired);
         }
     }

Modified: httpd/httpd/trunk/modules/http2/h2_mplx.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.h?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.h Wed Mar 16 14:01:53 2016
@@ -90,7 +90,7 @@ struct h2_mplx {
 
     apr_thread_mutex_t *lock;
     struct apr_thread_cond_t *added_output;
-    struct apr_thread_cond_t *req_added;
+    struct apr_thread_cond_t *task_thawed;
     struct apr_thread_cond_t *join_wait;
     
     apr_size_t stream_max_mem;
@@ -405,12 +405,15 @@ apr_status_t h2_mplx_idle(h2_mplx *m);
  * h2_req_engine handling
  ******************************************************************************/
 
+typedef void h2_output_consumed(void *ctx, conn_rec *c, apr_off_t consumed);
 typedef apr_status_t h2_mplx_req_engine_init(struct h2_req_engine *engine, 
                                              const char *id, 
                                              const char *type,
                                              apr_pool_t *pool, 
                                              apr_uint32_t req_buffer_size,
-                                             request_rec *r);
+                                             request_rec *r,
+                                             h2_output_consumed **pconsumed,
+                                             void **pbaton);
 
 apr_status_t h2_mplx_req_engine_push(const char *ngn_type, 
                                      request_rec *r, 

Modified: httpd/httpd/trunk/modules/http2/h2_ngn_shed.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_ngn_shed.c?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_ngn_shed.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_ngn_shed.c Wed Mar 16 14:01:53 2016
@@ -34,6 +34,7 @@
 #include "h2_ctx.h"
 #include "h2_h2.h"
 #include "h2_int_queue.h"
+#include "h2_mplx.h"
 #include "h2_response.h"
 #include "h2_request.h"
 #include "h2_task.h"
@@ -46,7 +47,6 @@ typedef struct h2_ngn_entry h2_ngn_entry
 struct h2_ngn_entry {
     APR_RING_ENTRY(h2_ngn_entry) link;
     h2_task *task;
-    request_rec *r;
 };
 
 #define H2_NGN_ENTRY_NEXT(e)	APR_RING_NEXT((e), link)
@@ -84,6 +84,9 @@ struct h2_req_engine {
     apr_uint32_t no_assigned;  /* # of assigned requests */
     apr_uint32_t no_live;      /* # of live */
     apr_uint32_t no_finished;  /* # of finished */
+    
+    h2_output_consumed *out_consumed;
+    void *out_consumed_ctx;
 };
 
 const char *h2_req_engine_get_id(h2_req_engine *engine)
@@ -96,6 +99,14 @@ int h2_req_engine_is_shutdown(h2_req_eng
     return engine->shutdown;
 }
 
+void h2_req_engine_out_consumed(h2_req_engine *engine, conn_rec *c, 
+                                apr_off_t bytes)
+{
+    if (engine->out_consumed) {
+        engine->out_consumed(engine->out_consumed_ctx, c, bytes);
+    }
+}
+
 h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c,
                                 apr_uint32_t default_capacity, 
                                 apr_uint32_t req_buffer_size)
@@ -132,26 +143,25 @@ void h2_ngn_shed_abort(h2_ngn_shed *shed
     shed->aborted = 1;
 }
 
-static void ngn_add_req(h2_req_engine *ngn, h2_task *task, request_rec *r)
+static void ngn_add_task(h2_req_engine *ngn, h2_task *task)
 {
     h2_ngn_entry *entry = apr_pcalloc(task->c->pool, sizeof(*entry));
     APR_RING_ELEM_INIT(entry, link);
     entry->task = task;
-    entry->r = r;
     H2_REQ_ENTRIES_INSERT_TAIL(&ngn->entries, entry);
 }
 
 
-apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type, 
-                                  h2_task *task, request_rec *r, 
-                                  h2_req_engine_init *einit){
+apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type, 
+                                   h2_task *task, http2_req_engine_init *einit) 
+{
     h2_req_engine *ngn;
 
     AP_DEBUG_ASSERT(shed);
     
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
                   "h2_ngn_shed(%ld): PUSHing request (task=%s)", shed->c->id, 
-                  apr_table_get(r->connection->notes, H2_TASK_ID_NOTE));
+                  task->id);
     if (task->ser_headers) {
         /* Max compatibility, deny processing of this */
         return APR_EOF;
@@ -165,10 +175,10 @@ apr_status_t h2_ngn_shed_push_req(h2_ngn
                       "h2_ngn_shed(%ld): pushing request %s to %s", 
                       shed->c->id, task->id, ngn->id);
         if (!h2_task_is_detached(task)) {
-            h2_task_freeze(task, r);
+            h2_task_freeze(task);
         }
         /* FIXME: sometimes ngn is garbage, probly alread freed */
-        ngn_add_req(ngn, task, r);
+        ngn_add_task(ngn, task);
         ngn->no_assigned++;
         return APR_SUCCESS;
     }
@@ -191,7 +201,8 @@ apr_status_t h2_ngn_shed_push_req(h2_ngn
         APR_RING_INIT(&newngn->entries, h2_ngn_entry, link);
         
         status = einit(newngn, newngn->id, newngn->type, newngn->pool,
-                       shed->req_buffer_size, r);
+                       shed->req_buffer_size, task->r, 
+                       &newngn->out_consumed, &newngn->out_consumed_ctx);
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c,
                       "h2_ngn_shed(%ld): create engine %s (%s)", 
                       shed->c->id, newngn->id, newngn->type);
@@ -199,6 +210,7 @@ apr_status_t h2_ngn_shed_push_req(h2_ngn
             AP_DEBUG_ASSERT(task->engine == NULL);
             newngn->task = task;
             task->engine = newngn;
+            task->assigned = newngn;
             apr_hash_set(shed->ngns, newngn->type, APR_HASH_KEY_STRING, newngn);
         }
         return status;
@@ -206,13 +218,17 @@ apr_status_t h2_ngn_shed_push_req(h2_ngn
     return APR_EOF;
 }
 
-static h2_ngn_entry *pop_non_frozen(h2_req_engine *ngn)
+static h2_ngn_entry *pop_detached(h2_req_engine *ngn)
 {
     h2_ngn_entry *entry;
     for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
          entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries);
          entry = H2_NGN_ENTRY_NEXT(entry)) {
-        if (!entry->task->frozen) {
+        if (h2_task_is_detached(entry->task) 
+            || (entry->task->engine == ngn)) {
+            /* The task hosting this engine can always be pulled by it.
+             * For other task, they need to become detached, e.g. no longer
+             * assigned to another worker. */
             H2_NGN_ENTRY_REMOVE(entry);
             return entry;
         }
@@ -220,16 +236,19 @@ static h2_ngn_entry *pop_non_frozen(h2_r
     return NULL;
 }
 
-apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed, 
-                                  h2_req_engine *ngn, 
-                                  apr_uint32_t capacity, 
-                                  int want_shutdown,
-                                  request_rec **pr)
+apr_status_t h2_ngn_shed_pull_task(h2_ngn_shed *shed, 
+                                   h2_req_engine *ngn, 
+                                   apr_uint32_t capacity, 
+                                   int want_shutdown,
+                                   h2_task **ptask)
 {   
     h2_ngn_entry *entry;
     
     AP_DEBUG_ASSERT(ngn);
-    *pr = NULL;
+    *ptask = NULL;
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c,
+                  "h2_ngn_shed(%ld): pull task for engine %s, shutdown=%d", 
+                  shed->c->id, ngn->id, want_shutdown);
     if (shed->aborted) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, shed->c,
                       "h2_ngn_shed(%ld): abort while pulling requests %s", 
@@ -249,14 +268,22 @@ apr_status_t h2_ngn_shed_pull_req(h2_ngn
         return ngn->shutdown? APR_EOF : APR_EAGAIN;
     }
     
-    if ((entry = pop_non_frozen(ngn))) {
+    if ((entry = pop_detached(ngn))) {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, entry->task->c,
                       "h2_ngn_shed(%ld): pulled request %s for engine %s", 
                       shed->c->id, entry->task->id, ngn->id);
         ngn->no_live++;
-        *pr = entry->r;
+        *ptask = entry->task;
+        entry->task->assigned = ngn;
         return APR_SUCCESS;
     }
+    
+    if (1) {
+        h2_ngn_entry *entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c,
+                      "h2_ngn_shed(%ld): pull task, nothing, first task %s", 
+                      shed->c->id, entry->task->id);
+    }
     return APR_EAGAIN;
 }
                                  
@@ -298,8 +325,7 @@ void h2_ngn_shed_done_ngn(h2_ngn_shed *s
         for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
              entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries);
              entry = H2_NGN_ENTRY_NEXT(entry)) {
-            request_rec *r = entry->r;
-            h2_task *task = h2_ctx_rget_task(r);
+            h2_task *task = entry->task;
             ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c,
                           "h2_ngn_shed(%ld): engine %s has queued task %s, "
                           "frozen=%d, aborting",

Modified: httpd/httpd/trunk/modules/http2/h2_ngn_shed.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_ngn_shed.h?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_ngn_shed.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_ngn_shed.h Wed Mar 16 14:01:53 2016
@@ -35,12 +35,17 @@ struct h2_ngn_shed {
 const char *h2_req_engine_get_id(h2_req_engine *engine);
 int h2_req_engine_is_shutdown(h2_req_engine *engine);
 
+void h2_req_engine_out_consumed(h2_req_engine *engine, conn_rec *c, 
+                                apr_off_t bytes);
+
 typedef apr_status_t h2_shed_ngn_init(h2_req_engine *engine, 
                                       const char *id, 
                                       const char *type,
                                       apr_pool_t *pool, 
                                       apr_uint32_t req_buffer_size,
-                                      request_rec *r);
+                                      request_rec *r,
+                                      h2_output_consumed **pconsumed,
+                                      void **pbaton);
 
 h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c,
                                 apr_uint32_t default_capactiy, 
@@ -53,13 +58,13 @@ h2_ngn_shed *h2_ngn_shed_get_shed(struct
 
 void h2_ngn_shed_abort(h2_ngn_shed *shed);
 
-apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type, 
-                                  struct h2_task *task, request_rec *r, 
+apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type, 
+                                  struct h2_task *task, 
                                   h2_shed_ngn_init *init_cb);
 
-apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed, h2_req_engine *pub_ngn, 
-                                  apr_uint32_t capacity, 
-                                  int want_shutdown, request_rec **pr);
+apr_status_t h2_ngn_shed_pull_task(h2_ngn_shed *shed, h2_req_engine *pub_ngn, 
+                                   apr_uint32_t capacity, 
+                                   int want_shutdown, struct h2_task **ptask);
 
 apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed, 
                                    struct h2_req_engine *ngn, 

Modified: httpd/httpd/trunk/modules/http2/h2_proxy_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_proxy_session.c?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_proxy_session.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_proxy_session.c Wed Mar 16 14:01:53 2016
@@ -372,7 +372,6 @@ static int on_data_chunk_recv(nghttp2_se
                                   stream_id, NGHTTP2_STREAM_CLOSED);
         return NGHTTP2_ERR_STREAM_CLOSING;
     }
-    nghttp2_session_consume(ngh2, stream_id, len);
     return 0;
 }
 
@@ -1042,6 +1041,7 @@ static void ev_stream_done(h2_proxy_sess
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
                       "h2_proxy_sesssion(%s): stream(%d) closed", 
                       session->id, stream_id);
+        
         if (!stream->data_received) {
             apr_bucket *b;
             /* if the response had no body, this is the time to flush
@@ -1286,7 +1286,8 @@ static int done_iter(void *udata, void *
 {
     cleanup_iter_ctx *ctx = udata;
     h2_proxy_stream *stream = val;
-    int touched = (stream->id <= ctx->session->last_stream_id);
+    int touched = (!ctx->session->last_stream_id || 
+                   stream->id <= ctx->session->last_stream_id);
     ctx->done(ctx->session, stream->r, 0, touched);
     return 1;
 }
@@ -1306,3 +1307,49 @@ void h2_proxy_session_cleanup(h2_proxy_s
     }
 }
 
+typedef struct {
+    h2_proxy_session *session;
+    conn_rec *c;
+    apr_off_t bytes;
+    int updated;
+} win_update_ctx;
+
+static int win_update_iter(void *udata, void *val)
+{
+    win_update_ctx *ctx = udata;
+    h2_proxy_stream *stream = val;
+    
+    if (stream->r && stream->r->connection == ctx->c) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, ctx->session->c, 
+                      "h2_proxy_session(%s-%d): win_update %ld bytes",
+                      ctx->session->id, (int)stream->id, (long)ctx->bytes);
+        nghttp2_session_consume(ctx->session->ngh2, stream->id, ctx->bytes);
+        ctx->updated = 1;
+        return 0;
+    }
+    return 1;
+}
+
+
+void h2_proxy_session_update_window(h2_proxy_session *session, 
+                                    conn_rec *c, apr_off_t bytes)
+{
+    if (session->streams && !h2_ihash_is_empty(session->streams)) {
+        win_update_ctx ctx;
+        ctx.session = session;
+        ctx.c = c;
+        ctx.bytes = bytes;
+        ctx.updated = 0;
+        h2_ihash_iter(session->streams, win_update_iter, &ctx);
+        
+        if (!ctx.updated) {
+            /* could not find the stream any more, possibly closed, update
+             * the connection window at least */
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
+                          "h2_proxy_session(%s): win_update conn %ld bytes",
+                          session->id, (long)bytes);
+            nghttp2_session_consume_connection(session->ngh2, (size_t)bytes);
+        }
+    }
+}
+

Modified: httpd/httpd/trunk/modules/http2/h2_proxy_session.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_proxy_session.h?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_proxy_session.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_proxy_session.h Wed Mar 16 14:01:53 2016
@@ -103,6 +103,9 @@ apr_status_t h2_proxy_session_process(h2
 
 void h2_proxy_session_cleanup(h2_proxy_session *s, h2_proxy_request_done *done);
 
+void h2_proxy_session_update_window(h2_proxy_session *s, 
+                                    conn_rec *c, apr_off_t bytes);
+
 #define H2_PROXY_REQ_URL_NOTE   "h2-proxy-req-url"
 
 #endif /* h2_proxy_session_h */

Modified: httpd/httpd/trunk/modules/http2/h2_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_session.c?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_session.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_session.c Wed Mar 16 14:01:53 2016
@@ -686,7 +686,9 @@ static apr_status_t h2_session_shutdown(
                           h2_mplx_get_max_stream_started(session->mplx), 
                           reason, (uint8_t*)err, err? strlen(err):0);
     status = nghttp2_session_send(session->ngh2);
-    h2_conn_io_flush(&session->io);
+    if (status == APR_SUCCESS) {
+        status = h2_conn_io_flush(&session->io);
+    }
     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03069)
                   "session(%ld): sent GOAWAY, err=%d, msg=%s", 
                   session->id, reason, err? err : "");
@@ -1432,6 +1434,9 @@ apr_status_t h2_session_stream_destroy(h
 {
     apr_pool_t *pool = h2_stream_detach_pool(stream);
 
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+                  "h2_stream(%ld-%d): cleanup by EOS bucket destroy", 
+                  session->id, stream->id);
     /* this may be called while the session has already freed
      * some internal structures or even when the mplx is locked. */
     if (session->mplx) {
@@ -1704,6 +1709,7 @@ static void h2_session_ev_init(h2_sessio
 
 static void h2_session_ev_local_goaway(h2_session *session, int arg, const char *msg)
 {
+    session->local_shutdown = 1;
     switch (session->state) {
         case H2_SESSION_ST_LOCAL_SHUTDOWN:
             /* already did that? */
@@ -2195,7 +2201,8 @@ apr_status_t h2_session_process(h2_sessi
                 }
                 else if (status == APR_TIMEUP) {
                     /* go back to checking all inputs again */
-                    transit(session, "wait cycle", H2_SESSION_ST_BUSY);
+                    transit(session, "wait cycle", session->local_shutdown? 
+                            H2_SESSION_ST_LOCAL_SHUTDOWN : H2_SESSION_ST_BUSY);
                 }
                 else {
                     ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, c,
@@ -2219,7 +2226,10 @@ apr_status_t h2_session_process(h2_sessi
                 break;
         }
 
-        h2_conn_io_flush(&session->io);
+        status = h2_conn_io_flush(&session->io);
+        if (status != APR_SUCCESS) {
+            dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+        }
         if (!nghttp2_session_want_read(session->ngh2) 
                  && !nghttp2_session_want_write(session->ngh2)) {
             dispatch_event(session, H2_SESSION_EV_NGH2_DONE, 0, NULL); 

Modified: httpd/httpd/trunk/modules/http2/h2_session.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_session.h?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_session.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_session.h Wed Mar 16 14:01:53 2016
@@ -85,6 +85,7 @@ typedef struct h2_session {
     unsigned int reprioritize  : 1; /* scheduled streams priority changed */
     unsigned int eoc_written   : 1; /* h2 eoc bucket written */
     unsigned int flush         : 1; /* flushing output necessary */
+    unsigned int local_shutdown: 1; /* GOAWAY has been sent by us */
     apr_interval_time_t  wait_us;   /* timout during BUSY_WAIT state, micro secs */
     
     int unsent_submits;             /* number of submitted, but not yet written responses. */

Modified: httpd/httpd/trunk/modules/http2/h2_task.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task.c?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_task.c Wed Mar 16 14:01:53 2016
@@ -291,7 +291,7 @@ static int h2_task_process_conn(conn_rec
     return DECLINED;
 }
 
-apr_status_t h2_task_freeze(h2_task *task, request_rec *r)
+apr_status_t h2_task_freeze(h2_task *task)
 {   
     if (!task->frozen) {
         task->frozen = 1;

Modified: httpd/httpd/trunk/modules/http2/h2_task.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task.h?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_task.h Wed Mar 16 14:01:53 2016
@@ -66,7 +66,9 @@ struct h2_task {
     struct h2_task_output *output;
     struct apr_thread_cond_t *io;   /* used to wait for events on */
     
-    struct h2_req_engine *engine;
+    struct h2_req_engine *engine;   /* engine hosted by this task */
+    struct h2_req_engine *assigned; /* engine that task has been assigned to */
+    request_rec *r;                 /* request being processed in this task */
 };
 
 h2_task *h2_task_create(long session_id, const struct h2_request *req, 
@@ -83,7 +85,7 @@ apr_status_t h2_task_init(apr_pool_t *po
 extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_in) *h2_task_logio_add_bytes_in;
 extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *h2_task_logio_add_bytes_out;
 
-apr_status_t h2_task_freeze(h2_task *task, request_rec *r);
+apr_status_t h2_task_freeze(h2_task *task);
 apr_status_t h2_task_thaw(h2_task *task);
 int h2_task_is_detached(h2_task *task);
 

Modified: httpd/httpd/trunk/modules/http2/h2_task_output.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task_output.c?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task_output.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_task_output.c Wed Mar 16 14:01:53 2016
@@ -158,8 +158,7 @@ apr_status_t h2_task_output_write(h2_tas
     }
     
     /* Attempt to write saved brigade first */
-    if (status == APR_SUCCESS && output->bb 
-        && !APR_BRIGADE_EMPTY(output->bb)) {
+    if (status == APR_SUCCESS && output->bb && !APR_BRIGADE_EMPTY(output->bb)) {
         status = write_brigade_raw(output, f, output->bb);
     }
     

Modified: httpd/httpd/trunk/modules/http2/h2_util.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_util.c?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_util.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_util.c Wed Mar 16 14:01:53 2016
@@ -14,7 +14,6 @@
  */
 
 #include <assert.h>
-
 #include <apr_strings.h>
 
 #include <httpd.h>
@@ -636,20 +635,6 @@ apr_status_t h2_util_copy(apr_bucket_bri
     return status;
 }
 
-int h2_util_has_flush_or_eos(apr_bucket_brigade *bb)
-{
-    apr_bucket *b;
-    for (b = APR_BRIGADE_FIRST(bb);
-         b != APR_BRIGADE_SENTINEL(bb);
-         b = APR_BUCKET_NEXT(b))
-    {
-        if (APR_BUCKET_IS_EOS(b) || APR_BUCKET_IS_FLUSH(b)) {
-            return 1;
-        }
-    }
-    return 0;
-}
-
 int h2_util_has_eos(apr_bucket_brigade *bb, apr_off_t len)
 {
     apr_bucket *b, *end;
@@ -950,6 +935,27 @@ apr_status_t h2_transfer_brigade(apr_buc
     return APR_SUCCESS;
 }
 
+apr_off_t h2_brigade_mem_size(apr_bucket_brigade *bb)
+{
+    apr_bucket *b;
+    apr_off_t total = 0;
+
+    for (b = APR_BRIGADE_FIRST(bb);
+         b != APR_BRIGADE_SENTINEL(bb);
+         b = APR_BUCKET_NEXT(b))
+    {
+        total += sizeof(*b);
+        if (b->length > 0) {
+            if (APR_BUCKET_IS_HEAP(b)
+                || APR_BUCKET_IS_POOL(b)) {
+                total += b->length;
+            }
+        }
+    }
+    return total;
+}
+
+
 /*******************************************************************************
  * h2_ngheader
  ******************************************************************************/

Modified: httpd/httpd/trunk/modules/http2/h2_util.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_util.h?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_util.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_util.h Wed Mar 16 14:01:53 2016
@@ -196,7 +196,6 @@ apr_status_t h2_util_copy(apr_bucket_bri
  * @param bb the brigade to check on
  * @return != 0 iff brigade holds FLUSH or EOS bucket (or both)
  */
-int h2_util_has_flush_or_eos(apr_bucket_brigade *bb);
 int h2_util_has_eos(apr_bucket_brigade *bb, apr_off_t len);
 int h2_util_bb_has_data(apr_bucket_brigade *bb);
 int h2_util_bb_has_data_or_eos(apr_bucket_brigade *bb);
@@ -257,4 +256,13 @@ apr_status_t h2_transfer_brigade(apr_buc
                                  apr_off_t *plen,
                                  int *peos);
 
+/**
+ * Get an approximnation of the memory footprint of the given
+ * brigade. This varies from apr_brigade_length as
+ * - no buckets are ever read
+ * - only buckets known to allocate memory (HEAP+POOL) are counted
+ * - the bucket struct itself is counted
+ */
+apr_off_t h2_brigade_mem_size(apr_bucket_brigade *bb);
+
 #endif /* defined(__mod_h2__h2_util__) */

Modified: httpd/httpd/trunk/modules/http2/mod_http2.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_http2.c?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_http2.c (original)
+++ httpd/httpd/trunk/modules/http2/mod_http2.c Wed Mar 16 14:01:53 2016
@@ -130,7 +130,7 @@ static int http2_is_h2(conn_rec *);
 
 static apr_status_t http2_req_engine_push(const char *ngn_type, 
                                           request_rec *r, 
-                                          h2_req_engine_init *einit)
+                                          http2_req_engine_init *einit)
 {
     return h2_mplx_req_engine_push(ngn_type, r, einit);
 }

Modified: httpd/httpd/trunk/modules/http2/mod_http2.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_http2.h?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_http2.h (original)
+++ httpd/httpd/trunk/modules/http2/mod_http2.h Wed Mar 16 14:01:53 2016
@@ -36,6 +36,8 @@ struct apr_thread_cond_t;
 
 typedef struct h2_req_engine h2_req_engine;
 
+typedef void http2_output_consumed(void *ctx, conn_rec *c, apr_off_t consumed);
+
 /**
  * Initialize a h2_req_engine. The structure will be passed in but
  * only the name and master are set. The function should initialize
@@ -43,12 +45,14 @@ typedef struct h2_req_engine h2_req_engi
  * @param engine the allocated, partially filled structure
  * @param r      the first request to process, or NULL
  */
-typedef apr_status_t h2_req_engine_init(h2_req_engine *engine, 
-                                        const char *id, 
-                                        const char *type,
-                                        apr_pool_t *pool, 
-                                        apr_uint32_t req_buffer_size,
-                                        request_rec *r);
+typedef apr_status_t http2_req_engine_init(h2_req_engine *engine, 
+                                           const char *id, 
+                                           const char *type,
+                                           apr_pool_t *pool, 
+                                           apr_uint32_t req_buffer_size,
+                                           request_rec *r,
+                                           http2_output_consumed **pconsumed,
+                                           void **pbaton);
 
 /**
  * Push a request to an engine with the specified name for further processing.
@@ -66,7 +70,7 @@ typedef apr_status_t h2_req_engine_init(
 APR_DECLARE_OPTIONAL_FN(apr_status_t, 
                         http2_req_engine_push, (const char *engine_type, 
                                                 request_rec *r,
-                                                h2_req_engine_init *einit));
+                                                http2_req_engine_init *einit));
 
 /**
  * Get a new request for processing in this engine.

Modified: httpd/httpd/trunk/modules/http2/mod_proxy_http2.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_proxy_http2.c?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_proxy_http2.c (original)
+++ httpd/httpd/trunk/modules/http2/mod_proxy_http2.c Wed Mar 16 14:01:53 2016
@@ -42,7 +42,7 @@ AP_DECLARE_MODULE(proxy_http2) = {
 /* Optional functions from mod_http2 */
 static int (*is_h2)(conn_rec *c);
 static apr_status_t (*req_engine_push)(const char *name, request_rec *r, 
-                                       h2_req_engine_init *einit);
+                                       http2_req_engine_init *einit);
 static apr_status_t (*req_engine_pull)(h2_req_engine *engine, 
                                        apr_read_type_e block, 
                                        apr_uint32_t capacity, 
@@ -71,7 +71,8 @@ typedef struct h2_proxy_ctx {
     unsigned is_ssl : 1;
     unsigned flushall : 1;
     
-    apr_status_t r_status; /* status of our first request work */
+    apr_status_t r_status;     /* status of our first request work */
+    h2_proxy_session *session; /* current http2 session against backend */
 } h2_proxy_ctx;
 
 static int h2_proxy_post_config(apr_pool_t *p, apr_pool_t *plog,
@@ -196,12 +197,23 @@ static int proxy_http2_canon(request_rec
     return OK;
 }
 
+static void out_consumed(void *baton, conn_rec *c, apr_off_t bytes)
+{
+    h2_proxy_ctx *ctx = baton;
+    
+    if (ctx->session) {
+        h2_proxy_session_update_window(ctx->session, c, bytes);
+    }
+}
+
 static apr_status_t proxy_engine_init(h2_req_engine *engine, 
                                         const char *id, 
                                         const char *type,
                                         apr_pool_t *pool, 
                                         apr_uint32_t req_buffer_size,
-                                        request_rec *r)
+                                        request_rec *r,
+                                        http2_output_consumed **pconsumed,
+                                        void **pctx)
 {
     h2_proxy_ctx *ctx = ap_get_module_config(r->connection->conn_config, 
                                              &proxy_http2_module);
@@ -212,6 +224,8 @@ static apr_status_t proxy_engine_init(h2
         ctx->engine_pool = pool;
         ctx->req_buffer_size = req_buffer_size;
         ctx->capacity = 100;
+        *pconsumed = out_consumed;
+        *pctx = ctx;
         return APR_SUCCESS;
     }
     ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r, 
@@ -250,7 +264,7 @@ static void request_done(h2_proxy_sessio
         if (req_engine_push && is_h2 && is_h2(ctx->owner)) {
             if (req_engine_push(ctx->engine_type, r, NULL) == APR_SUCCESS) {
                 /* push to engine */
-                ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, r->connection, 
+                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection, 
                               "h2_proxy_session(%s): rescheduled request %s",
                               ctx->engine_id, task_id);
                 return;
@@ -287,8 +301,8 @@ static apr_status_t next_request(h2_prox
     }
     else if (req_engine_pull && ctx->engine) {
         apr_status_t status;
-        status = req_engine_pull(ctx->engine, 
-                                 before_leave? APR_BLOCK_READ: APR_NONBLOCK_READ, 
+        status = req_engine_pull(ctx->engine, before_leave? 
+                                 APR_BLOCK_READ: APR_NONBLOCK_READ, 
                                  ctx->capacity, &ctx->next);
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, ctx->owner, 
                       "h2_proxy_engine(%s): pulled request %s", 
@@ -301,40 +315,39 @@ static apr_status_t next_request(h2_prox
 
 static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
     apr_status_t status = OK;
-    h2_proxy_session *session;
     
     /* Step Four: Send the Request in a new HTTP/2 stream and
      * loop until we got the response or encounter errors.
      */
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner, 
                   "eng(%s): setup session", ctx->engine_id);
-    session = h2_proxy_session_setup(ctx->engine_id, ctx->p_conn, ctx->conf, 
-                                     30, h2_log2(ctx->req_buffer_size), 
-                                     request_done);
-    if (!session) {
+    ctx->session = h2_proxy_session_setup(ctx->engine_id, ctx->p_conn, ctx->conf, 
+                                          30, h2_log2(ctx->req_buffer_size), 
+                                          request_done);
+    if (!ctx->session) {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, 
                       "session unavailable");
         return HTTP_SERVICE_UNAVAILABLE;
     }
     
     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, 
-                  "eng(%s): run session %s", ctx->engine_id, session->id);
-    session->user_data = ctx;
+                  "eng(%s): run session %s", ctx->engine_id, ctx->session->id);
+    ctx->session->user_data = ctx;
     
     while (1) {
         if (ctx->next) {
-            add_request(session, ctx->next);
+            add_request(ctx->session, ctx->next);
             ctx->next = NULL;
         }
         
-        status = h2_proxy_session_process(session);
+        status = h2_proxy_session_process(ctx->session);
         
         if (status == APR_SUCCESS) {
             apr_status_t s2;
             /* ongoing processing, call again */
-            if (session->remote_max_concurrent > 0
-                && session->remote_max_concurrent != ctx->capacity) {
-                ctx->capacity = session->remote_max_concurrent;
+            if (ctx->session->remote_max_concurrent > 0
+                && ctx->session->remote_max_concurrent != ctx->capacity) {
+                ctx->capacity = ctx->session->remote_max_concurrent;
             }
             s2 = next_request(ctx, 0);
             if (s2 == APR_ECONNABORTED) {
@@ -344,7 +357,7 @@ static apr_status_t proxy_engine_run(h2_
                 status = s2;
                 break;
             }
-            if (!ctx->next && h2_ihash_is_empty(session->streams)) {
+            if (!ctx->next && h2_ihash_is_empty(ctx->session->streams)) {
                 break;
             }
         }
@@ -357,12 +370,13 @@ static apr_status_t proxy_engine_run(h2_
              * a) be reopened on the new session iff safe to do so
              * b) reported as done (failed) otherwise
              */
-            h2_proxy_session_cleanup(session, request_done);
+            h2_proxy_session_cleanup(ctx->session, request_done);
             break;
         }
     }
     
-    session->user_data = NULL;
+    ctx->session->user_data = NULL;
+    ctx->session = NULL;
     
     return status;
 }
@@ -556,6 +570,8 @@ run_session:
         /* session and connection still ok */
         if (next_request(ctx, 1) == APR_SUCCESS) {
             /* more requests, run again */
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, 
+                          "run_session, again");
             goto run_session;
         }
         /* done */



Mime
View raw message