httpd-cvs mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ic...@apache.org
Subject svn commit: r1731096 [1/2] - in /httpd/httpd/trunk: ./ modules/http2/
Date Thu, 18 Feb 2016 17:02:02 GMT
Author: icing
Date: Thu Feb 18 17:02:02 2016
New Revision: 1731096

URL: http://svn.apache.org/viewvc?rev=1731096&view=rev
Log:
using proxy http2 connection concurrently, if main connection also uses http/2, needs more hardening

Added:
    httpd/httpd/trunk/modules/http2/h2_int_queue.c
    httpd/httpd/trunk/modules/http2/h2_int_queue.h
Modified:
    httpd/httpd/trunk/CHANGES
    httpd/httpd/trunk/CMakeLists.txt
    httpd/httpd/trunk/modules/http2/NWGNUmod_http2
    httpd/httpd/trunk/modules/http2/config2.m4
    httpd/httpd/trunk/modules/http2/h2_ctx.c
    httpd/httpd/trunk/modules/http2/h2_ctx.h
    httpd/httpd/trunk/modules/http2/h2_io.c
    httpd/httpd/trunk/modules/http2/h2_io.h
    httpd/httpd/trunk/modules/http2/h2_mplx.c
    httpd/httpd/trunk/modules/http2/h2_mplx.h
    httpd/httpd/trunk/modules/http2/h2_proxy_session.c
    httpd/httpd/trunk/modules/http2/h2_proxy_session.h
    httpd/httpd/trunk/modules/http2/h2_session.c
    httpd/httpd/trunk/modules/http2/h2_task.c
    httpd/httpd/trunk/modules/http2/h2_task_input.c
    httpd/httpd/trunk/modules/http2/h2_task_input.h
    httpd/httpd/trunk/modules/http2/h2_util.c
    httpd/httpd/trunk/modules/http2/h2_worker.c
    httpd/httpd/trunk/modules/http2/h2_workers.c
    httpd/httpd/trunk/modules/http2/h2_workers.h
    httpd/httpd/trunk/modules/http2/mod_http2.c
    httpd/httpd/trunk/modules/http2/mod_http2.dsp
    httpd/httpd/trunk/modules/http2/mod_http2.h
    httpd/httpd/trunk/modules/http2/mod_proxy_http2.c

Modified: httpd/httpd/trunk/CHANGES
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/CHANGES?rev=1731096&r1=1731095&r2=1731096&view=diff
==============================================================================
--- httpd/httpd/trunk/CHANGES [utf-8] (original)
+++ httpd/httpd/trunk/CHANGES [utf-8] Thu Feb 18 17:02:02 2016
@@ -1,6 +1,10 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache 2.5.0
 
+  *) mod_proxy_http2: using single connection for several requests *if*
+     master connection uses HTTP/2 itself. Not yet hardened under load.
+     [Stefan Eissing]
+
   *) core: Added support for HTTP code 451. PR58985.
      [Yehuda Katz <yehuda ymkatz.net>, Jim Jagielski]
 
@@ -22,6 +26,7 @@ Changes with Apache 2.5.0
 
   *) mod_proxy_http2: new experimental http2 proxy module for h2: and h2c: proxy
      urls. Uses, so far, one connection per request, reuses connections.
+     [Stefan Eissing]
   
   *) event: use pre_connection hook to properly initialize connection state for
      slave connections. use protocol_switch hook to initialize server config

Modified: httpd/httpd/trunk/CMakeLists.txt
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/CMakeLists.txt?rev=1731096&r1=1731095&r2=1731096&view=diff
==============================================================================
--- httpd/httpd/trunk/CMakeLists.txt (original)
+++ httpd/httpd/trunk/CMakeLists.txt Thu Feb 18 17:02:02 2016
@@ -409,7 +409,7 @@ SET(mod_http2_extra_sources
   modules/http2/h2_session.c         modules/http2/h2_stream.c 
   modules/http2/h2_stream_set.c      modules/http2/h2_switch.c
   modules/http2/h2_task.c            modules/http2/h2_task_input.c
-  modules/http2/h2_task_output.c     modules/http2/h2_task_queue.c
+  modules/http2/h2_task_output.c     modules/http2/h2_int_queue.c
   modules/http2/h2_util.c            modules/http2/h2_worker.c
   modules/http2/h2_workers.c
 )

Modified: httpd/httpd/trunk/modules/http2/NWGNUmod_http2
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/NWGNUmod_http2?rev=1731096&r1=1731095&r2=1731096&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/NWGNUmod_http2 (original)
+++ httpd/httpd/trunk/modules/http2/NWGNUmod_http2 Thu Feb 18 17:02:02 2016
@@ -194,6 +194,7 @@ FILES_nlm_objs = \
 	$(OBJDIR)/h2_filter.o \
 	$(OBJDIR)/h2_from_h1.o \
 	$(OBJDIR)/h2_h2.o \
+	$(OBJDIR)/h2_int_queue.o \
 	$(OBJDIR)/h2_io.o \
 	$(OBJDIR)/h2_io_set.o \
 	$(OBJDIR)/h2_mplx.o \
@@ -207,7 +208,6 @@ FILES_nlm_objs = \
 	$(OBJDIR)/h2_task.o \
 	$(OBJDIR)/h2_task_input.o \
 	$(OBJDIR)/h2_task_output.o \
-	$(OBJDIR)/h2_task_queue.o \
 	$(OBJDIR)/h2_util.o \
 	$(OBJDIR)/h2_worker.o \
 	$(OBJDIR)/h2_workers.o \

Modified: httpd/httpd/trunk/modules/http2/config2.m4
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/config2.m4?rev=1731096&r1=1731095&r2=1731096&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/config2.m4 (original)
+++ httpd/httpd/trunk/modules/http2/config2.m4 Thu Feb 18 17:02:02 2016
@@ -29,6 +29,7 @@ h2_ctx.lo dnl
 h2_filter.lo dnl
 h2_from_h1.lo dnl
 h2_h2.lo dnl
+h2_int_queue.lo dnl
 h2_io.lo dnl
 h2_io_set.lo dnl
 h2_mplx.lo dnl
@@ -42,7 +43,6 @@ h2_switch.lo dnl
 h2_task.lo dnl
 h2_task_input.lo dnl
 h2_task_output.lo dnl
-h2_task_queue.lo dnl
 h2_util.lo dnl
 h2_worker.lo dnl
 h2_workers.lo dnl
@@ -156,8 +156,10 @@ AC_DEFUN([APACHE_CHECK_NGHTTP2],[
         AC_MSG_WARN([nghttp2 library is unusable])
       fi
 dnl # nghttp2 >= 1.3.0: access to stream weights
-      AC_CHECK_FUNCS([nghttp2_stream_get_weight], 
-        [APR_ADDTO(MOD_CPPFLAGS, ["-DH2_NG2_STREAM_API"])], [])
+      AC_CHECK_FUNCS([nghttp2_stream_get_weight], [], [liberrors="yes"])
+      if test "x$liberrors" != "x"; then
+        AC_MSG_WARN([nghttp2 version >= 1.3.0 is required])
+      fi
 dnl # nghttp2 >= 1.5.0: changing stream priorities
       AC_CHECK_FUNCS([nghttp2_session_change_stream_priority], 
         [APR_ADDTO(MOD_CPPFLAGS, ["-DH2_NG2_CHANGE_PRIO"])], [])
@@ -206,6 +208,7 @@ APR_ADDTO(INCLUDES, [-I\$(top_srcdir)/$m
 dnl #  list of module object files
 proxy_http2_objs="dnl
 mod_proxy_http2.lo dnl
+h2_int_queue.lo dnl
 h2_proxy_session.lo dnl
 h2_request.lo dnl
 h2_util.lo dnl

Modified: httpd/httpd/trunk/modules/http2/h2_ctx.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_ctx.c?rev=1731096&r1=1731095&r2=1731096&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_ctx.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_ctx.c Thu Feb 18 17:02:02 2016
@@ -101,7 +101,17 @@ int h2_ctx_is_task(h2_ctx *ctx)
     return ctx && ctx->task;
 }
 
-struct h2_task *h2_ctx_get_task(h2_ctx *ctx)
+h2_task *h2_ctx_get_task(h2_ctx *ctx)
 {
     return ctx? ctx->task : NULL;
 }
+
+h2_task *h2_ctx_cget_task(conn_rec *c)
+{
+    return h2_ctx_get_task(h2_ctx_get(c, 0));
+}
+
+h2_task *h2_ctx_rget_task(request_rec *r)
+{
+    return h2_ctx_get_task(h2_ctx_rget(r));
+}

Modified: httpd/httpd/trunk/modules/http2/h2_ctx.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_ctx.h?rev=1731096&r1=1731095&r2=1731096&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_ctx.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_ctx.h Thu Feb 18 17:02:02 2016
@@ -71,5 +71,7 @@ const char *h2_ctx_protocol_get(const co
 int h2_ctx_is_task(h2_ctx *ctx);
 
 struct h2_task *h2_ctx_get_task(h2_ctx *ctx);
+struct h2_task *h2_ctx_cget_task(conn_rec *c);
+struct h2_task *h2_ctx_rget_task(request_rec *r);
 
 #endif /* defined(__mod_h2__h2_ctx__) */

Added: httpd/httpd/trunk/modules/http2/h2_int_queue.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_int_queue.c?rev=1731096&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_int_queue.c (added)
+++ httpd/httpd/trunk/modules/http2/h2_int_queue.c Thu Feb 18 17:02:02 2016
@@ -0,0 +1,182 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+#include <stddef.h>
+#include <apr_pools.h>
+
+#include "h2_int_queue.h"
+
+
+static void tq_grow(h2_int_queue *q, int nlen);
+static void tq_swap(h2_int_queue *q, int i, int j);
+static int tq_bubble_up(h2_int_queue *q, int i, int top, 
+                        h2_iq_cmp *cmp, void *ctx);
+static int tq_bubble_down(h2_int_queue *q, int i, int bottom, 
+                          h2_iq_cmp *cmp, void *ctx);
+
+h2_int_queue *h2_iq_create(apr_pool_t *pool, int capacity)
+{
+    h2_int_queue *q = apr_pcalloc(pool, sizeof(h2_int_queue));
+    if (q) {
+        q->pool = pool;
+        tq_grow(q, capacity);
+        q->nelts = 0;
+    }
+    return q;
+}
+
+int h2_iq_empty(h2_int_queue *q)
+{
+    return q->nelts == 0;
+}
+
+int h2_iq_size(h2_int_queue *q)
+{
+    return q->nelts;
+}
+
+
+void h2_iq_add(h2_int_queue *q, int sid, h2_iq_cmp *cmp, void *ctx)
+{
+    int i;
+    
+    if (q->nelts >= q->nalloc) {
+        tq_grow(q, q->nalloc * 2);
+    }
+    
+    i = (q->head + q->nelts) % q->nalloc;
+    q->elts[i] = sid;
+    ++q->nelts;
+    
+    if (cmp) {
+        /* bubble it to the front of the queue */
+        tq_bubble_up(q, i, q->head, cmp, ctx);
+    }
+}
+
+int h2_iq_remove(h2_int_queue *q, int sid)
+{
+    int i;
+    for (i = 0; i < q->nelts; ++i) {
+        if (sid == q->elts[(q->head + i) % q->nalloc]) {
+            break;
+        }
+    }
+    
+    if (i < q->nelts) {
+        ++i;
+        for (; i < q->nelts; ++i) {
+            q->elts[(q->head+i-1)%q->nalloc] = q->elts[(q->head+i)%q->nalloc];
+        }
+        --q->nelts;
+        return 1;
+    }
+    return 0;
+}
+
+void h2_iq_sort(h2_int_queue *q, h2_iq_cmp *cmp, void *ctx)
+{
+    /* Assume that changes in ordering are minimal. This needs,
+     * best case, q->nelts - 1 comparisions to check that nothing
+     * changed.
+     */
+    if (q->nelts > 0) {
+        int i, ni, prev, last;
+        
+        /* Start at the end of the queue and create a tail of sorted
+         * entries. Make that tail one element longer in each iteration.
+         */
+        last = i = (q->head + q->nelts - 1) % q->nalloc;
+        while (i != q->head) {
+            prev = (q->nalloc + i - 1) % q->nalloc;
+            
+            ni = tq_bubble_up(q, i, prev, cmp, ctx);
+            if (ni == prev) {
+                /* i bubbled one up, bubble the new i down, which
+                 * keeps all tasks below i sorted. */
+                tq_bubble_down(q, i, last, cmp, ctx);
+            }
+            i = prev;
+        };
+    }
+}
+
+
+int h2_iq_shift(h2_int_queue *q)
+{
+    int sid;
+    
+    if (q->nelts <= 0) {
+        return 0;
+    }
+    
+    sid = q->elts[q->head];
+    q->head = (q->head + 1) % q->nalloc;
+    q->nelts--;
+    
+    return sid;
+}
+
+static void tq_grow(h2_int_queue *q, int nlen)
+{
+    if (nlen > q->nalloc) {
+        int *nq = apr_pcalloc(q->pool, sizeof(int) * nlen);
+        if (q->nelts > 0) {
+            int l = ((q->head + q->nelts) % q->nalloc) - q->head;
+            
+            memmove(nq, q->elts + q->head, sizeof(int) * l);
+            if (l < q->nelts) {
+                /* elts wrapped, append elts in [0, remain] to nq */
+                int remain = q->nelts - l;
+                memmove(nq + l, q->elts, sizeof(int) * remain);
+            }
+        }
+        q->elts = nq;
+        q->nalloc = nlen;
+        q->head = 0;
+    }
+}
+
+static void tq_swap(h2_int_queue *q, int i, int j)
+{
+    int x = q->elts[i];
+    q->elts[i] = q->elts[j];
+    q->elts[j] = x;
+}
+
+static int tq_bubble_up(h2_int_queue *q, int i, int top, 
+                        h2_iq_cmp *cmp, void *ctx) 
+{
+    int prev;
+    while (((prev = (q->nalloc + i - 1) % q->nalloc), i != top) 
+           && (*cmp)(q->elts[i], q->elts[prev], ctx) < 0) {
+        tq_swap(q, prev, i);
+        i = prev;
+    }
+    return i;
+}
+
+static int tq_bubble_down(h2_int_queue *q, int i, int bottom, 
+                          h2_iq_cmp *cmp, void *ctx)
+{
+    int next;
+    while (((next = (q->nalloc + i + 1) % q->nalloc), i != bottom) 
+           && (*cmp)(q->elts[i], q->elts[next], ctx) > 0) {
+        tq_swap(q, next, i);
+        i = next;
+    }
+    return i;
+}

Added: httpd/httpd/trunk/modules/http2/h2_int_queue.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_int_queue.h?rev=1731096&view=auto
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_int_queue.h (added)
+++ httpd/httpd/trunk/modules/http2/h2_int_queue.h Thu Feb 18 17:02:02 2016
@@ -0,0 +1,103 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __mod_h2__h2_int_queue__
+#define __mod_h2__h2_int_queue__
+
+/**
+ * h2_int_queue keeps a list of sorted h2_task* in ascending order.
+ */
+typedef struct h2_int_queue h2_int_queue;
+
+struct h2_int_queue {
+    int *elts;
+    int head;
+    int nelts;
+    int nalloc;
+    apr_pool_t *pool;
+};
+
+/**
+ * Comparator for two task to determine their order.
+ *
+ * @param s1 stream id to compare
+ * @param s2 stream id to compare
+ * @param ctx provided user data
+ * @return value is the same as for strcmp() and has the effect:
+ *    == 0: s1 and s2 are treated equal in ordering
+ *     < 0: s1 should be sorted before s2
+ *     > 0: s2 should be sorted before s1
+ */
+typedef int h2_iq_cmp(int s1, int s2, void *ctx);
+
+
+/**
+ * Allocate a new queue from the pool and initialize.
+ * @param id the identifier of the queue
+ * @param pool the memory pool
+ */
+h2_int_queue *h2_iq_create(apr_pool_t *pool, int capacity);
+
+/**
+ * Return != 0 iff there are no tasks in the queue.
+ * @param q the queue to check
+ */
+int h2_iq_empty(h2_int_queue *q);
+
+/**
+ * Return the number of int in the queue.
+ * @param q the queue to get size on
+ */
+int h2_iq_size(h2_int_queue *q);
+
+/**
+ * Add a stream idto the queue. 
+ *
+ * @param q the queue to append the task to
+ * @param sid the stream id to add
+ * @param cmp the comparator for sorting
+ * @param ctx user data for comparator 
+ */
+void h2_iq_add(h2_int_queue *q, int sid, h2_iq_cmp *cmp, void *ctx);
+
+/**
+ * Remove the stream id from the queue. Return != 0 iff task
+ * was found in queue.
+ * @param q the task queue
+ * @param sid the stream id to remove
+ * @return != 0 iff task was found in queue
+ */
+int h2_iq_remove(h2_int_queue *q, int sid);
+
+/**
+ * Sort the stream idqueue again. Call if the task ordering
+ * has changed.
+ *
+ * @param q the queue to sort
+ * @param cmp the comparator for sorting
+ * @param ctx user data for the comparator 
+ */
+void h2_iq_sort(h2_int_queue *q, h2_iq_cmp *cmp, void *ctx);
+
+/**
+ * Get the first stream id from the queue or NULL if the queue is empty. 
+ * The task will be removed.
+ *
+ * @param q the queue to get the first task from
+ * @return the first stream id of the queue, 0 if empty
+ */
+int h2_iq_shift(h2_int_queue *q);
+
+#endif /* defined(__mod_h2__h2_int_queue__) */

Modified: httpd/httpd/trunk/modules/http2/h2_io.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_io.c?rev=1731096&r1=1731095&r2=1731096&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_io.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_io.c Thu Feb 18 17:02:02 2016
@@ -75,6 +75,11 @@ int h2_io_in_has_eos_for(h2_io *io)
     return io->eos_in || (io->bbin && h2_util_has_eos(io->bbin, -1));
 }
 
+int h2_io_in_has_data(h2_io *io)
+{
+    return io->bbin && h2_util_bb_has_data_or_eos(io->bbin);
+}
+
 int h2_io_out_has_data(h2_io *io)
 {
     return io->bbout && h2_util_bb_has_data_or_eos(io->bbout);
@@ -256,6 +261,18 @@ apr_status_t h2_io_in_read(h2_io *io, ap
         }
     }
     
+    if (status == APR_SUCCESS && (!io->bbin || APR_BRIGADE_EMPTY(io->bbin))) {
+        if (io->eos_in) {
+            if (!io->eos_in_written) {
+                status = append_eos(io, bb, trailers);
+                io->eos_in_written = 1;
+            }
+        }
+    }
+    
+    if (status == APR_SUCCESS && APR_BRIGADE_EMPTY(bb)) {
+        return APR_EAGAIN;
+    }
     return status;
 }
 

Modified: httpd/httpd/trunk/modules/http2/h2_io.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_io.h?rev=1731096&r1=1731095&r2=1731096&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_io.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_io.h Thu Feb 18 17:02:02 2016
@@ -63,9 +63,6 @@ struct h2_io {
     apr_size_t input_consumed;       /* how many bytes have been read */
         
     int files_handles_owned;
-    
-    struct h2_task *task;            /* parked task */
-    request_rec *r;                  /* parked request */
 };
 
 /*******************************************************************************
@@ -101,6 +98,10 @@ int h2_io_in_has_eos_for(h2_io *io);
  * Output data is available.
  */
 int h2_io_out_has_data(h2_io *io);
+/**
+ * Input data is available.
+ */
+int h2_io_in_has_data(h2_io *io);
 
 void h2_io_signal(h2_io *io, h2_io_op op);
 void h2_io_signal_init(h2_io *io, h2_io_op op, apr_interval_time_t timeout, 

Modified: httpd/httpd/trunk/modules/http2/h2_mplx.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.c?rev=1731096&r1=1731095&r2=1731096&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.c Thu Feb 18 17:02:02 2016
@@ -34,6 +34,7 @@
 #include "h2_conn.h"
 #include "h2_ctx.h"
 #include "h2_h2.h"
+#include "h2_int_queue.h"
 #include "h2_io.h"
 #include "h2_io_set.h"
 #include "h2_response.h"
@@ -44,7 +45,6 @@
 #include "h2_task.h"
 #include "h2_task_input.h"
 #include "h2_task_output.h"
-#include "h2_task_queue.h"
 #include "h2_worker.h"
 #include "h2_workers.h"
 #include "h2_util.h"
@@ -206,7 +206,13 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
             return NULL;
         }
         
-        m->q = h2_tq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS));
+        status = apr_thread_cond_create(&m->request_done, m->pool);
+        if (status != APR_SUCCESS) {
+            h2_mplx_destroy(m);
+            return NULL;
+        }
+
+        m->q = h2_iq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS));
         m->stream_ios = h2_io_set_create(m->pool);
         m->ready_ios = h2_io_set_create(m->pool);
         m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
@@ -296,7 +302,7 @@ static int io_stream_done(h2_mplx *m, h2
     h2_io_set_remove(m->ready_ios, io);
     if (!io->processing_started || io->processing_done) {
         /* already finished or not even started yet */
-        h2_tq_remove(m->q, io->id);
+        h2_iq_remove(m->q, io->id);
         io_destroy(m, io, 1);
         return 0;
     }
@@ -324,6 +330,7 @@ apr_status_t h2_mplx_release_and_join(h2
         /* disable WINDOW_UPDATE callbacks */
         h2_mplx_set_consumed_cb(m, NULL, NULL);
         
+        apr_thread_cond_broadcast(m->request_done);
         while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) {
             /* iterate until all ios have been orphaned or destroyed */
         }
@@ -351,6 +358,8 @@ apr_status_t h2_mplx_release_and_join(h2
                                   "all h2_workers to return, have still %d requests outstanding", 
                                   m->id, i*wait_secs, (int)h2_io_set_size(m->stream_ios));
                 }
+                m->aborted = 1;
+                apr_thread_cond_broadcast(m->request_done);
             }
         }
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
@@ -408,7 +417,7 @@ static const h2_request *pop_request(h2_
 {
     const h2_request *req = NULL;
     int sid;
-    while (!m->aborted && !req && (sid = h2_tq_shift(m->q)) > 0) {
+    while (!m->aborted && !req && (sid = h2_iq_shift(m->q)) > 0) {
         h2_io *io = h2_io_set_get(m->stream_ios, sid);
         if (io) {
             req = io->request;
@@ -421,17 +430,8 @@ static const h2_request *pop_request(h2_
     return req;
 }
 
-static apr_status_t h2_mplx_engine_schedule(h2_mplx *m, request_rec *r)
-{
-    if (!m->engine_queue) {
-        apr_queue_create(&m->engine_queue, 200, m->pool);
-    }
-    return apr_queue_trypush(m->engine_queue, r);
-}
-
-void h2_mplx_request_done(h2_mplx **pm, int stream_id, const h2_request **preq)
+void h2_mplx_request_done(h2_mplx *m, int stream_id, const h2_request **preq)
 {
-    h2_mplx *m = *pm;
     int acquired;
     
     if (enter_mutex(m, &acquired) == APR_SUCCESS) {
@@ -440,49 +440,30 @@ void h2_mplx_request_done(h2_mplx **pm,
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
                           "h2_mplx(%ld): request(%d) done", m->id, stream_id);
             if (io) {
-                request_rec *r = io->r;
-                
+                io->processing_done = 1;
+                h2_mplx_out_close(m, stream_id, NULL);
                 if (io->orphaned) {
-                    io->processing_done = 1;
-                }
-                else if (r) {
-                    /* A parked request which is being transferred from
-                     * one worker thread to another. This request_done call
-                     * was from the initial thread and now it is safe to
-                     * schedule it for further processing. */
-                    h2_task_thaw(io->task);
-                    io->task = NULL;
-                    io->r = NULL;
-                    h2_mplx_engine_schedule(*pm, r);
+                    io_destroy(m, io, 0);
+                    if (m->join_wait) {
+                        apr_thread_cond_signal(m->join_wait);
+                    }
                 }
                 else {
-                    io->processing_done = 1;
-                }
-                
-                if (io->processing_done) {
-                    h2_io_out_close(io, NULL);
-                    if (io->orphaned) {
-                        io_destroy(m, io, 0);
-                        if (m->join_wait) {
-                            apr_thread_cond_signal(m->join_wait);
-                        }
-                    }
-                    else {
-                        /* hang around until the stream deregisteres */
-                    }
+                    /* hang around until the stream deregisteres */
                 }
             }
+            else {
+                ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+                              "h2_mplx(%ld): request(%d) done, no io found", 
+                              m->id, stream_id);
+            }
+            apr_thread_cond_broadcast(m->request_done);
         }
         
         if (preq) {
             /* someone wants another request, if we have */
             *preq = pop_request(m);
         }
-        if (!preq || !*preq) {
-            /* No request to hand back to the worker, NULLify reference
-             * and decrement count */
-            *pm = NULL;
-        }
         leave_mutex(m, acquired);
     }
 }
@@ -935,6 +916,26 @@ int h2_mplx_in_has_eos_for(h2_mplx *m, i
     return has_eos;
 }
 
+int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id)
+{
+    apr_status_t status;
+    int has_data = 0;
+    int acquired;
+    
+    AP_DEBUG_ASSERT(m);
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+        if (io && !io->orphaned) {
+            has_data = h2_io_in_has_data(io);
+        }
+        else {
+            has_data = 0;
+        }
+        leave_mutex(m, acquired);
+    }
+    return has_data;
+}
+
 int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id)
 {
     apr_status_t status;
@@ -1001,7 +1002,7 @@ apr_status_t h2_mplx_reprioritize(h2_mpl
             status = APR_ECONNABORTED;
         }
         else {
-            h2_tq_sort(m->q, cmp, ctx);
+            h2_iq_sort(m->q, cmp, ctx);
             
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                           "h2_mplx(%ld): reprioritize tasks", m->id);
@@ -1050,8 +1051,8 @@ apr_status_t h2_mplx_process(h2_mplx *m,
                 status = h2_io_in_close(io);
             }
             
-            was_empty = h2_tq_empty(m->q);
-            h2_tq_add(m->q, io->id, cmp, ctx);
+            was_empty = h2_iq_empty(m->q);
+            h2_iq_add(m->q, io->id, cmp, ctx);
             
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
                           "h2_mplx(%ld-%d): process", m->c->id, stream_id);
@@ -1079,61 +1080,116 @@ const h2_request *h2_mplx_pop_request(h2
         }
         else {
             req = pop_request(m);
-            *has_more = !h2_tq_empty(m->q);
+            *has_more = !h2_iq_empty(m->q);
         }
         leave_mutex(m, acquired);
     }
     return req;
 }
 
-apr_status_t h2_mplx_engine_push(h2_mplx *m, h2_task *task,
-                                 const char *engine_type, 
+
+/*******************************************************************************
+ * HTTP/2 request engines
+ ******************************************************************************/
+ 
+typedef struct h2_req_engine_i h2_req_engine_i;
+struct h2_req_engine_i {
+    h2_req_engine pub;
+    conn_rec *c;               /* connection this engine is assigned to */
+    h2_mplx *m;
+    unsigned int shutdown : 1; /* engine is being shut down */
+    apr_thread_cond_t *io;     /* condition var for waiting on data */
+    apr_queue_t *queue;        /* queue of scheduled request_rec* */
+    apr_size_t no_assigned;    /* # of assigned requests */
+    apr_size_t no_live;        /* # of live */
+    apr_size_t no_finished;    /* # of finished */
+};
+
+static apr_status_t h2_mplx_engine_schedule(h2_mplx *m, 
+                                            h2_req_engine_i *engine, 
+                                            request_rec *r)
+{
+    if (!engine->queue) {
+        apr_queue_create(&engine->queue, 100, engine->pub.pool);
+    }
+    return apr_queue_trypush(engine->queue, r);
+}
+
+
+apr_status_t h2_mplx_engine_push(const char *engine_type, 
                                  request_rec *r, h2_mplx_engine_init *einit)
 {
     apr_status_t status;
+    h2_mplx *m;
+    h2_task *task;
     int acquired;
     
+    task = h2_ctx_rget_task(r);
+    if (!task) {
+        return APR_ECONNABORTED;
+    }
+    m = task->mplx;
     AP_DEBUG_ASSERT(m);
+    
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
         if (!io || io->orphaned) {
             status = APR_ECONNABORTED;
         }
         else {
-            h2_req_engine *engine;
+            h2_req_engine_i *engine = (h2_req_engine_i*)m->engine;
             
             apr_table_set(r->connection->notes, H2_TASK_ID_NOTE, task->id);
             status = APR_EOF;
-            engine = m->engine; /* just a single one for now */
+            
             if (task->ser_headers) {
                 /* Max compatibility, deny processing of this */
             }
-            else if (!engine && einit) {
-                engine = apr_pcalloc(r->connection->pool, sizeof(*engine));
-                engine->id = 1;
-                engine->c = r->connection;
-                engine->pool = r->connection->pool;
-                engine->type = apr_pstrdup(engine->pool, engine_type);
-                
-                status = einit(engine, r);
-                if (status == APR_SUCCESS) {
-                    m->engine = engine;
+            else if (engine && !strcmp(engine->pub.type, engine_type)) {
+                if (engine->shutdown 
+                    || engine->no_assigned >= H2MIN(engine->pub.capacity, 100)) {
                     ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
-                                  "h2_mplx(%ld): init engine %d (%s)", 
-                                  m->c->id, engine->id, engine->type);
+                                  "h2_mplx(%ld): engine shutdown or over %s", 
+                                  m->c->id, engine->pub.id);
+                    engine = NULL;
                 }
-            }
-            else if (engine && !strcmp(engine->type, engine_type)) {
-                if (status == APR_SUCCESS) {
+                else if (h2_mplx_engine_schedule(m, engine, r) == APR_SUCCESS) {
                     /* this task will be processed in another thread,
                      * freeze any I/O for the time being. */
                     h2_task_freeze(task, r);
-                    io->task = task;
-                    io->r = r;
+                    engine->no_assigned++;
+                    status = APR_SUCCESS;
+                    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r,
+                                  "h2_mplx(%ld): push request %s", 
+                                  m->c->id, r->the_request);
+                }
+                else {
+                    ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
+                                  "h2_mplx(%ld): engine error adding req %s", 
+                                  m->c->id, engine->pub.id);
+                    engine = NULL;
+                }
+            }
+            
+            if (!engine && einit) {
+                engine = apr_pcalloc(task->pool, sizeof(*engine));
+                engine->pub.id = apr_psprintf(task->pool, "eng-%ld-%d", 
+                                               m->id, m->next_eng_id++);
+                engine->pub.pool = task->pool;
+                engine->pub.type = apr_pstrdup(task->pool, engine_type);
+                engine->c = r->connection;
+                engine->m = m;
+                engine->io = task->io;
+                engine->no_assigned = 1;
+                engine->no_live = 1;
+                
+                status = einit(&engine->pub, r);
+                ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
+                              "h2_mplx(%ld): init engine %s (%s)", 
+                              m->c->id, engine->pub.id, engine->pub.type);
+                if (status == APR_SUCCESS) {
+                    m->engine = &engine->pub;
                 }
-                ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, r,
-                              "h2_mplx(%ld): push request %s", 
-                              m->c->id, r->the_request);
             }
         }
         
@@ -1141,52 +1197,163 @@ apr_status_t h2_mplx_engine_push(h2_mplx
     }
     return status;
 }
+
+static request_rec *get_non_frozen(apr_queue_t *equeue)
+{
+    request_rec *r, *first = NULL;
+    h2_task *task;
+    void *elem;
+
+    if (equeue) {
+        /* FIFO queue, try to find a  request_rec whose task is not frozen */
+        while (apr_queue_trypop(equeue, &elem) == APR_SUCCESS) {
+            r = elem;
+            task = h2_ctx_rget_task(r);
+            AP_DEBUG_ASSERT(task);
+            if (!task->frozen) {
+                return r;
+            }
+            apr_queue_push(equeue, r);  
+            if (!first) {
+                first = r;
+            }
+            else if (r == first) {
+                return NULL; /* walked the whole queue */
+            }
+        }
+    }
+    return NULL;
+}
+
+static apr_status_t engine_pull(h2_mplx *m, h2_req_engine_i *engine, 
+                                apr_read_type_e block, request_rec **pr)
+{   
+    request_rec *r;
+    
+    AP_DEBUG_ASSERT(m);
+    AP_DEBUG_ASSERT(engine);
+    while (1) {
+        if (m->aborted) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                          "h2_mplx(%ld): mplx abort while pulling requests %s", 
+                          m->id, engine->pub.id);
+            *pr = NULL;
+            return APR_EOF;
+        }
+        
+        if (engine->queue && (r = get_non_frozen(engine->queue))) {
+            ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
+                          "h2_mplx(%ld): request %s pulled by engine %s", 
+                          m->c->id, r->the_request, engine->pub.id);
+            engine->no_live++;
+            *pr = r;
+            return APR_SUCCESS;
+        }
+        else if (APR_NONBLOCK_READ == block) {
+            *pr = NULL;
+            return APR_EAGAIN;
+        }
+        else if (!engine->queue || !apr_queue_size(engine->queue)) {
+            engine->shutdown = 1;
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                          "h2_mplx(%ld): emtpy queue, shutdown engine %s", 
+                          m->id, engine->pub.id);
+            *pr = NULL;
+            return APR_EOF;
+        }
+        apr_thread_cond_timedwait(m->request_done, m->lock, 
+                                  apr_time_from_msec(100));
+    }
+}
                                  
-apr_status_t h2_mplx_engine_pull(h2_mplx *m, h2_task *task,
-                                 struct h2_req_engine *engine, 
-                                 apr_time_t timeout, request_rec **pr)
+apr_status_t h2_mplx_engine_pull(h2_req_engine *pub_engine, 
+                                 apr_read_type_e block, request_rec **pr)
 {   
+    h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine;
+    h2_mplx *m = engine->m;
     apr_status_t status;
     int acquired;
     
-    AP_DEBUG_ASSERT(m);
+    *pr = NULL;
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        status = APR_ECONNABORTED;
-        if (m->engine == engine && m->engine_queue) {
-            void *elem;
-            status = apr_queue_trypop(m->engine_queue, &elem);
-            if (status == APR_SUCCESS) {
-                *pr = elem;
-                ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, *pr,
-                              "h2_mplx(%ld): request %s pulled by engine %d", 
-                              m->c->id, (*pr)->the_request, engine->id);
-            }
-        }
+        status = engine_pull(m, engine, block, pr);
         leave_mutex(m, acquired);
     }
     return status;
 }
  
-void h2_mplx_engine_done(h2_mplx *m, struct h2_task *task, conn_rec *r_conn)
+static void engine_done(h2_mplx *m, h2_req_engine_i *engine, h2_task *task, 
+                        int waslive, int aborted)
 {
-    int stream_id = task->stream_id;
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
+                  "h2_mplx(%ld): task %s %s by %s", 
+                  m->id, task->id, aborted? "aborted":"done", 
+                  engine->pub.id);
     h2_task_output_close(task->output);
-    h2_mplx_request_done(&m, stream_id, NULL);
-    apr_pool_destroy(r_conn->pool);
+    h2_mplx_request_done(m, task->stream_id, NULL);
+    apr_pool_destroy(task->pool);
+    engine->no_finished++;
+    if (waslive) engine->no_live--;
+    engine->no_assigned--;
+}
+                                
+void h2_mplx_engine_done(h2_req_engine *pub_engine, conn_rec *r_conn)
+{
+    h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine;
+    h2_mplx *m = engine->m;
+    h2_task *task;
+    int acquired;
+
+    task = h2_ctx_cget_task(r_conn);
+    if (task && (enter_mutex(m, &acquired) == APR_SUCCESS)) {
+        engine_done(m, engine, task, 1, 0);
+        leave_mutex(m, acquired);
+    }
 }
                                 
-void h2_mplx_engine_exit(h2_mplx *m, struct h2_task *task, 
-                         struct h2_req_engine *engine)
+void h2_mplx_engine_exit(h2_req_engine *pub_engine)
 {
+    h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine;
+    h2_mplx *m = engine->m;
     int acquired;
     
-    AP_DEBUG_ASSERT(m);
     if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-        /* TODO: shutdown of engine->c */
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                      "h2_mplx(%ld): exit engine %d (%s)", 
-                      m->c->id, engine->id, engine->type);
-        m->engine = NULL;
+        if (engine->queue && apr_queue_size(engine->queue)) {
+            void *entry;
+            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+                          "h2_mplx(%ld): exit engine %s (%s), "
+                          "has still %d requests queued, "
+                          "assigned=%ld, live=%ld, finished=%ld", 
+                          m->c->id, engine->pub.id, engine->pub.type,
+                          (int)apr_queue_size(engine->queue),
+                          (long)engine->no_assigned, (long)engine->no_live,
+                          (long)engine->no_finished);
+            while (apr_queue_trypop(engine->queue, &entry) == APR_SUCCESS) {
+                request_rec *r = entry;
+                h2_task *task = h2_ctx_rget_task(r);
+                ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+                              "h2_mplx(%ld): engine %s has queued task %s, "
+                              "frozen=%d, aborting",
+                              m->c->id, engine->pub.id, task->id, task->frozen);
+                engine_done(m, engine, task, 0, 1);
+            }
+        }
+        if (engine->no_assigned > 1 || engine->no_live > 1) {
+            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+                          "h2_mplx(%ld): exit engine %s (%s), "
+                          "assigned=%ld, live=%ld, finished=%ld", 
+                          m->c->id, engine->pub.id, engine->pub.type,
+                          (long)engine->no_assigned, (long)engine->no_live,
+                          (long)engine->no_finished);
+        }
+        else {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                          "h2_mplx(%ld): exit engine %s (%s)", 
+                          m->c->id, engine->pub.id, engine->pub.type);
+        }
+        if (m->engine == &engine->pub) {
+            m->engine = NULL; /* TODO */
+        }
         leave_mutex(m, acquired);
     }
 }

Modified: httpd/httpd/trunk/modules/http2/h2_mplx.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.h?rev=1731096&r1=1731095&r2=1731096&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.h Thu Feb 18 17:02:02 2016
@@ -46,7 +46,7 @@ struct h2_io_set;
 struct apr_thread_cond_t;
 struct h2_workers;
 struct h2_stream_set;
-struct h2_task_queue;
+struct h2_int_queue;
 struct h2_req_engine;
 
 #include <apr_queue.h>
@@ -69,7 +69,7 @@ struct h2_mplx {
 
     unsigned int aborted : 1;
 
-    struct h2_task_queue *q;
+    struct h2_int_queue *q;
     struct h2_io_set *stream_ios;
     struct h2_io_set *ready_ios;
     
@@ -77,6 +77,7 @@ struct h2_mplx {
 
     apr_thread_mutex_t *lock;
     struct apr_thread_cond_t *added_output;
+    struct apr_thread_cond_t *request_done;
     struct apr_thread_cond_t *join_wait;
     
     apr_size_t stream_max_mem;
@@ -91,7 +92,9 @@ struct h2_mplx {
     void *input_consumed_ctx;
     
     struct h2_req_engine *engine;
+    /* TODO: signal for waiting tasks*/
     apr_queue_t *engine_queue;
+    int next_eng_id;
 };
 
 
@@ -127,7 +130,7 @@ apr_status_t h2_mplx_release_and_join(h2
  */
 void h2_mplx_abort(h2_mplx *mplx);
 
-void h2_mplx_request_done(h2_mplx **pm, int stream_id, const struct h2_request **preq);
+void h2_mplx_request_done(h2_mplx *m, int stream_id, const struct h2_request **preq);
 
 /**
  * Get the highest stream identifier that has been passed on to processing.
@@ -151,10 +154,14 @@ int h2_mplx_get_max_stream_started(h2_mp
  */
 apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error);
 
-/* Return != 0 iff the multiplexer has data for the given stream. 
+/* Return != 0 iff the multiplexer has output data for the given stream. 
  */
 int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id);
 
+/* Return != 0 iff the multiplexer has input data for the given stream. 
+ */
+int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id);
+
 /**
  * Waits on output data from any stream in this session to become available. 
  * Returns APR_TIMEUP if no data arrived in the given time.
@@ -385,17 +392,14 @@ APR_RING_INSERT_TAIL((b), ap__b, h2_mplx
 typedef apr_status_t h2_mplx_engine_init(struct h2_req_engine *engine, 
                                          request_rec *r);
 
-apr_status_t h2_mplx_engine_push(h2_mplx *m, struct h2_task *task, 
-                                 const char *engine_type, 
+apr_status_t h2_mplx_engine_push(const char *engine_type, 
                                  request_rec *r, h2_mplx_engine_init *einit);
                                  
-apr_status_t h2_mplx_engine_pull(h2_mplx *m, struct h2_task *task, 
-                                 struct h2_req_engine *engine, 
-                                 apr_time_t timeout, request_rec **pr);
+apr_status_t h2_mplx_engine_pull(struct h2_req_engine *engine, 
+                                 apr_read_type_e block, request_rec **pr);
 
-void h2_mplx_engine_done(h2_mplx *m, struct h2_task *task, conn_rec *r_conn);
+void h2_mplx_engine_done(struct h2_req_engine *engine, conn_rec *r_conn);
                                  
-void h2_mplx_engine_exit(h2_mplx *m, struct h2_task *task, 
-                         struct h2_req_engine *engine);
+void h2_mplx_engine_exit(struct h2_req_engine *engine);
 
 #endif /* defined(__mod_h2__h2_mplx__) */

Modified: httpd/httpd/trunk/modules/http2/h2_proxy_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_proxy_session.c?rev=1731096&r1=1731095&r2=1731096&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_proxy_session.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_proxy_session.c Thu Feb 18 17:02:02 2016
@@ -21,12 +21,33 @@
 #include <mod_http2.h>
 
 #include "h2.h"
+#include "h2_int_queue.h"
 #include "h2_request.h"
 #include "h2_util.h"
 #include "h2_proxy_session.h"
 
 APLOG_USE_MODULE(proxy_http2);
 
+typedef struct h2_proxy_stream {
+    int id;
+    apr_pool_t *pool;
+    h2_proxy_session *session;
+
+    const char *url;
+    request_rec *r;
+    h2_request *req;
+
+    h2_stream_state_t state;
+    unsigned int suspended : 1;
+    unsigned int data_received : 1;
+
+    apr_bucket_brigade *input;
+    apr_bucket_brigade *output;
+    
+    apr_table_t *saves;
+} h2_proxy_stream;
+
+
 static int ngstatus_from_apr_status(apr_status_t rv)
 {
     if (rv == APR_SUCCESS) {
@@ -41,19 +62,21 @@ static int ngstatus_from_apr_status(apr_
     return NGHTTP2_ERR_PROTO;
 }
 
+static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev, 
+                           int arg, const char *msg);
+
 
-static apr_status_t proxy_session_shutdown(void *theconn)
+static apr_status_t proxy_session_pre_close(void *theconn)
 {
     proxy_conn_rec *p_conn = (proxy_conn_rec *)theconn;
     h2_proxy_session *session = p_conn->data;
 
     if (session && session->ngh2) {
-        if (session->c && !session->c->aborted && !session->goaway_sent) {
-            nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 
-                                  session->max_stream_recv, 0, NULL, 0);
-            nghttp2_session_send(session->ngh2);
-        }
-
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
+                      "proxy_session(%s): shutdown, state=%d, streams=%d",
+                      session->id, session->state, 
+                      h2_iq_size(session->streams));
+        dispatch_event(session, H2_PROXYS_EV_PRE_CLOSE, 0, NULL);
         nghttp2_session_del(session->ngh2);
         session->ngh2 = NULL;
         p_conn->data = NULL;
@@ -109,8 +132,8 @@ static ssize_t raw_send(nghttp2_session
     int flush = 1;
 
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, 
-                  "h2_proxy_sesssion(%ld): raw_send %d bytes, flush=%d", 
-                  session->c->id, (int)length, flush);
+                  "h2_proxy_sesssion(%s): raw_send %d bytes, flush=%d", 
+                  session->id, (int)length, flush);
     b = apr_bucket_transient_create((const char*)data, length, 
                                     session->c->bucket_alloc);
     APR_BRIGADE_INSERT_TAIL(session->output, b);
@@ -120,7 +143,7 @@ static ssize_t raw_send(nghttp2_session
                                 session->output, flush);
     if (status != APR_SUCCESS) {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, 
-                      "h2_proxy_sesssion(%ld): sending", session->c->id);
+                      "h2_proxy_sesssion(%s): sending", session->id);
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
     return length;
@@ -138,8 +161,8 @@ static int on_frame_recv(nghttp2_session
         
         h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO()
-                      "h2_session(%ld): recv FRAME[%s]",
-                      session->c->id, buffer);
+                      "h2_session(%s): recv FRAME[%s]",
+                      session->id, buffer);
     }
 
     switch (frame->hd.type) {
@@ -150,9 +173,22 @@ static int on_frame_recv(nghttp2_session
             break;
         case NGHTTP2_PUSH_PROMISE:
             break;
+        case NGHTTP2_SETTINGS:
+            if (frame->settings.niv > 0) {
+                session->remote_max_concurrent = nghttp2_session_get_remote_settings(ngh2, NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS);
+            }
+            break;
         case NGHTTP2_GOAWAY:
-            session->goaway_recvd = 1;
+            dispatch_event(session, H2_PROXYS_EV_REMOTE_GOAWAY, 0, NULL);
             /* TODO: close handling */
+            if (APLOGcinfo(session->c)) {
+                char buffer[256];
+                
+                h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
+                ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, session->c, APLOGNO()
+                              "h2_session(%s): recv FRAME[%s]",
+                              session->id, buffer);
+            }
             break;
         default:
             break;
@@ -164,13 +200,27 @@ static int before_frame_send(nghttp2_ses
                              const nghttp2_frame *frame, void *user_data)
 {
     h2_proxy_session *session = user_data;
-    if (APLOGcdebug(session->c)) {
-        char buffer[256];
-        
-        h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03068)
-                      "h2_session(%ld): sent FRAME[%s]",
-                      session->c->id, buffer);
+    switch (frame->hd.type) {
+        case NGHTTP2_GOAWAY:
+            if (APLOGcinfo(session->c)) {
+                char buffer[256];
+                
+                h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
+                ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, session->c, APLOGNO()
+                              "h2_session(%s): sent FRAME[%s]",
+                              session->id, buffer);
+            }
+            break;
+        default:
+            if (APLOGcdebug(session->c)) {
+                char buffer[256];
+                
+                h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
+                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO()
+                              "h2_session(%s): sent FRAME[%s]",
+                              session->id, buffer);
+            }
+            break;
     }
     return 0;
 }
@@ -217,8 +267,8 @@ static apr_status_t h2_proxy_stream_add_
             char *s = apr_pstrndup(stream->pool, v, vlen);
             
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, 
-                          "h2_proxy_stream(%ld-%d): got status %s", 
-                          stream->session->c->id, stream->id, s);
+                          "h2_proxy_stream(%s-%d): got status %s", 
+                          stream->session->id, stream->id, s);
             stream->r->status = (int)apr_atoi64(s);
             if (stream->r->status <= 0) {
                 stream->r->status = 500;
@@ -236,8 +286,8 @@ static apr_status_t h2_proxy_stream_add_
         hvalue = apr_pstrndup(stream->pool, v, vlen);
         
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, 
-                      "h2_proxy_stream(%ld-%d): got header %s: %s", 
-                      stream->session->c->id, stream->id, hname, hvalue);
+                      "h2_proxy_stream(%s-%d): got header %s: %s", 
+                      stream->session->id, stream->id, hname, hvalue);
         process_proxy_header(stream->r, hname, hvalue);
     }
     return APR_SUCCESS;
@@ -248,8 +298,8 @@ static int log_header(void *ctx, const c
     h2_proxy_stream *stream = ctx;
     
     ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r, 
-                  "h2_proxy_stream(%ld-%d), header_out %s: %s", 
-                  stream->session->c->id, stream->id, key, value);
+                  "h2_proxy_stream(%s-%d), header_out %s: %s", 
+                  stream->session->id, stream->id, key, value);
     return 1;
 }
 
@@ -307,8 +357,8 @@ static void h2_proxy_stream_end_headers_
     
     if (APLOGrtrace2(stream->r)) {
         ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r, 
-                      "h2_proxy_stream(%ld-%d), header_out after merging", 
-                      stream->session->c->id, stream->id);
+                      "h2_proxy_stream(%s-%d), header_out after merging", 
+                      stream->session->id, stream->id);
         apr_table_do(log_header, stream, stream->r->headers_out, NULL);
     }
 }
@@ -338,11 +388,15 @@ static int on_data_chunk_recv(nghttp2_se
     b = apr_bucket_transient_create((const char*)data, len, 
                                     stream->r->connection->bucket_alloc);
     APR_BRIGADE_INSERT_TAIL(stream->output, b);
+    if (flags & NGHTTP2_DATA_FLAG_EOF) {
+        b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
+        APR_BRIGADE_INSERT_TAIL(stream->output, b);
+    }
     status = ap_pass_brigade(stream->r->output_filters, stream->output);
     if (status != APR_SUCCESS) {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO()
-                      "h2_session(%ld-%d): passing output", 
-                      session->c->id, stream->id);
+                      "h2_session(%s-%d): passing output", 
+                      session->id, stream->id);
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
     return 0;
@@ -352,23 +406,7 @@ static int on_stream_close(nghttp2_sessi
                            uint32_t error_code, void *user_data) 
 {
     h2_proxy_session *session = user_data;
-    h2_proxy_stream *stream;
-    
-    stream = nghttp2_session_get_stream_user_data(ngh2, stream_id);
-    if (!stream) {
-        return 0;
-    }
-    
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, 
-                  "h2_proxy_sesssion(%ld): closing stream(%d)", 
-                  session->c->id, stream_id);
-
-    if (!stream->data_received) {
-        /* last chance to manipulate response headers.
-         * after this, only trailers */
-        stream->data_received = 1;
-    }
-    stream->state = H2_STREAM_ST_CLOSED;
+    dispatch_event(session, H2_PROXYS_EV_STREAM_DONE, stream_id, NULL);
     return 0;
 }
 
@@ -413,11 +451,11 @@ static ssize_t stream_data_read(nghttp2_
     
     if (APR_BRIGADE_EMPTY(stream->input)) {
         status = ap_get_brigade(stream->r->input_filters, stream->input,
-                                AP_MODE_READBYTES, APR_BLOCK_READ,
-                                H2MIN(APR_BUCKET_BUFF_SIZE, length));
+                                AP_MODE_READBYTES, APR_NONBLOCK_READ,
+                                H2MAX(APR_BUCKET_BUFF_SIZE, length));
         ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r, 
-                      "h2_proxy_stream(%d): request body read", 
-                      stream->id);
+                      "h2_proxy_stream(%s-%d): request body read", 
+                      stream->session->id, stream->id);
     }
 
     if (status == APR_SUCCESS) {
@@ -459,31 +497,42 @@ static ssize_t stream_data_read(nghttp2_
         return readlen;
     }
     else if (APR_STATUS_IS_EAGAIN(status)) {
+        /* suspended stream, needs to be re-awakened */
+        ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r, 
+                      "h2_proxy_stream(%s-%d): suspending", 
+                      stream->session->id, stream_id);
+        stream->suspended = 1;
+        h2_iq_add(stream->session->suspended, stream->id, NULL, NULL);
         return NGHTTP2_ERR_DEFERRED;
     }
     return ngstatus_from_apr_status(status);
 }
 
-h2_proxy_session *h2_proxy_session_setup(request_rec *r, proxy_conn_rec *p_conn,
-                                         proxy_server_conf *conf)
+h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
+                                         proxy_server_conf *conf,
+                                         h2_proxy_request_done *done)
 {
     if (!p_conn->data) {
+        apr_pool_t *pool = p_conn->scpool;
         h2_proxy_session *session;
-        nghttp2_settings_entry settings[2];
         nghttp2_session_callbacks *cbs;
-        int add_conn_window;
-        int rv;
+        nghttp2_option *option;
         
-        session = apr_pcalloc(p_conn->scpool, sizeof(*session));
-        apr_pool_pre_cleanup_register(p_conn->scpool, p_conn, proxy_session_shutdown);
+        session = apr_pcalloc(pool, sizeof(*session));
+        apr_pool_pre_cleanup_register(pool, p_conn, proxy_session_pre_close);
         p_conn->data = session;
         
+        session->id = apr_pstrdup(p_conn->scpool, id);
         session->c = p_conn->connection;
         session->p_conn = p_conn;
         session->conf = conf;
         session->pool = p_conn->scpool;
+        session->state = H2_PROXYS_ST_INIT;
         session->window_bits_default    = 30;
         session->window_bits_connection = 30;
+        session->streams = h2_iq_create(pool, 25);
+        session->suspended = h2_iq_create(pool, 5);
+        session->done = done;
     
         session->input = apr_brigade_create(session->pool, session->c->bucket_alloc);
         session->output = apr_brigade_create(session->pool, session->c->bucket_alloc);
@@ -496,70 +545,41 @@ h2_proxy_session *h2_proxy_session_setup
         nghttp2_session_callbacks_set_before_frame_send_callback(cbs, before_frame_send);
         nghttp2_session_callbacks_set_send_callback(cbs, raw_send);
         
-        nghttp2_session_client_new(&session->ngh2, cbs, session);
+        nghttp2_option_new(&option);
+        nghttp2_option_set_peer_max_concurrent_streams(option, 20);
+        
+        nghttp2_session_client_new2(&session->ngh2, cbs, session, option);
+        
+        nghttp2_option_del(option);
         nghttp2_session_callbacks_del(cbs);
 
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
                       "setup session for %s", p_conn->hostname);
         
-        settings[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
-        settings[0].value = 0;
-        settings[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
-        settings[1].value = (1 << session->window_bits_default) - 1;
-        
-        rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE, settings, 
-                                     H2_ALEN(settings));
-
-        /* If the connection window is larger than our default, trigger a WINDOW_UPDATE */
-        add_conn_window = ((1 << session->window_bits_connection) - 1 -
-                           NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE);
-        if (!rv && add_conn_window != 0) {
-            rv = nghttp2_submit_window_update(session->ngh2, NGHTTP2_FLAG_NONE, 0, add_conn_window);
-        }
     }
     return p_conn->data;
 }
 
-
-apr_status_t h2_proxy_session_open_stream(h2_proxy_session *session, const char *url,
-                                          request_rec *r, h2_proxy_stream **pstream)
+static apr_status_t session_start(h2_proxy_session *session) 
 {
-    h2_proxy_stream *stream;
-    apr_uri_t puri;
-    const char *authority, *scheme, *path;
-
-    stream = apr_pcalloc(r->pool, sizeof(*stream));
-
-    stream->pool = r->pool;
-    stream->url = url;
-    stream->r = r;
-    stream->session = session;
-    stream->state = H2_STREAM_ST_IDLE;
+    nghttp2_settings_entry settings[2];
+    int rv, add_conn_window;
     
-    stream->input = apr_brigade_create(stream->pool, session->c->bucket_alloc);
-    stream->output = apr_brigade_create(stream->pool, session->c->bucket_alloc);
-    
-    stream->req = h2_request_create(1, stream->pool, 0);
-
-    apr_uri_parse(stream->pool, url, &puri);
-    scheme = (strcmp(puri.scheme, "h2")? "http" : "https");
-    authority = puri.hostname;
-    if (!ap_strchr_c(authority, ':') && puri.port
-        && apr_uri_port_of_scheme(scheme) != puri.port) {
-        /* port info missing and port is not default for scheme: append */
-        authority = apr_psprintf(stream->pool, "%s:%d", authority, puri.port);
+    settings[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
+    settings[0].value = 0;
+    settings[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
+    settings[1].value = (1 << session->window_bits_default) - 1;
+    
+    rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE, settings, 
+                                 H2_ALEN(settings));
+    
+    /* If the connection window is larger than our default, trigger a WINDOW_UPDATE */
+    add_conn_window = ((1 << session->window_bits_connection) - 1 -
+                       NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE);
+    if (!rv && add_conn_window != 0) {
+        rv = nghttp2_submit_window_update(session->ngh2, NGHTTP2_FLAG_NONE, 0, add_conn_window);
     }
-    path = apr_uri_unparse(stream->pool, &puri, APR_URI_UNP_OMITSITEPART);
-    h2_request_make(stream->req, stream->pool, r->method, scheme,
-                    authority, path, r->headers_in);
-
-    /* Tuck away all already existing cookies */
-    stream->saves = apr_table_make(r->pool, 2);
-    apr_table_do(add_header, stream->saves, r->headers_out,"Set-Cookie", NULL);
-
-    *pstream = stream;
-    
-    return APR_SUCCESS;
+    return rv? APR_EGENERAL : APR_SUCCESS;
 }
 
 static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *bb)
@@ -595,69 +615,57 @@ static apr_status_t feed_brigade(h2_prox
     }
     
     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, 
-                  "h2_session(%ld): fed %ld bytes of input", session->c->id, (long)readlen);
+                  "h2_session(%s): fed %ld bytes of input to session", 
+                  session->id, (long)readlen);
     if (readlen == 0 && status == APR_SUCCESS) {
         return APR_EAGAIN;
     }
     return status;
 }
 
-
-static apr_status_t stream_loop(h2_proxy_stream *stream) 
+static apr_status_t open_stream(h2_proxy_session *session, const char *url,
+                                request_rec *r, h2_proxy_stream **pstream)
 {
-    h2_proxy_session *session = stream->session;
-    apr_status_t status = APR_SUCCESS;
-    int want_read, want_write;
+    h2_proxy_stream *stream;
+    apr_uri_t puri;
+    const char *authority, *scheme, *path;
+
+    stream = apr_pcalloc(r->pool, sizeof(*stream));
+
+    stream->pool = r->pool;
+    stream->url = url;
+    stream->r = r;
+    stream->session = session;
+    stream->state = H2_STREAM_ST_IDLE;
     
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, 
-                  "h2_session(%ld): start loop for stream %d", 
-                  session->c->id, stream->id);
-    while ((status == APR_SUCCESS || APR_STATUS_IS_EAGAIN(status))
-           && stream->state != H2_STREAM_ST_CLOSED) {
-           
-        want_read = nghttp2_session_want_read(session->ngh2);
-        want_write = nghttp2_session_want_write(session->ngh2);
-               
-        if (want_write) {
-            int rv = nghttp2_session_send(session->ngh2);
-            if (rv < 0 && nghttp2_is_fatal(rv)) {
-                status = APR_EGENERAL;
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, 
-                              "h2_session(%ld): write, rv=%d", session->c->id, rv);
-                break;
-            }
-        }
+    stream->input = apr_brigade_create(stream->pool, session->c->bucket_alloc);
+    stream->output = apr_brigade_create(stream->pool, session->c->bucket_alloc);
+    
+    stream->req = h2_request_create(1, stream->pool, 0);
 
-        if (want_read) {
-            status = ap_get_brigade(session->c->input_filters, session->input, 
-                                    AP_MODE_READBYTES, 
-                                    (want_write? APR_NONBLOCK_READ : APR_BLOCK_READ), 
-                                    APR_BUCKET_BUFF_SIZE);
-            if (status == APR_SUCCESS) {
-                status = feed_brigade(session, session->input);
-            }
-            else if (!APR_STATUS_IS_EAGAIN(status)) {
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, 
-                              "h2_session(%ld): read", session->c->id);
-                break;
-            }
-        }
-        
-        if (!want_read && !want_write) {
-            status = APR_EGENERAL;
-            break;
-        }
+    apr_uri_parse(stream->pool, url, &puri);
+    scheme = (strcmp(puri.scheme, "h2")? "http" : "https");
+    authority = puri.hostname;
+    if (!ap_strchr_c(authority, ':') && puri.port
+        && apr_uri_port_of_scheme(scheme) != puri.port) {
+        /* port info missing and port is not default for scheme: append */
+        authority = apr_psprintf(stream->pool, "%s:%d", authority, puri.port);
     }
+    path = apr_uri_unparse(stream->pool, &puri, APR_URI_UNP_OMITSITEPART);
+    h2_request_make(stream->req, stream->pool, r->method, scheme,
+                    authority, path, r->headers_in);
+
+    /* Tuck away all already existing cookies */
+    stream->saves = apr_table_make(r->pool, 2);
+    apr_table_do(add_header, stream->saves, r->headers_out,"Set-Cookie", NULL);
+
+    *pstream = stream;
     
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, 
-                  "h2_session(%ld): end loop for stream %d", 
-                  session->c->id, stream->id);
-    return status;
+    return APR_SUCCESS;
 }
 
-apr_status_t h2_proxy_stream_process(h2_proxy_stream *stream)
+static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *stream)
 {
-    h2_proxy_session *session = stream->session;
     h2_ngheader *hd;
     nghttp2_data_provider *pp = NULL;
     nghttp2_data_provider provider;
@@ -685,16 +693,525 @@ apr_status_t h2_proxy_stream_process(h2_
         const char *task_id = apr_table_get(stream->r->connection->notes, 
                                             H2_TASK_ID_NOTE);
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
-                      "h2_session(%ld): submit %s%s -> %d (task %s)", 
-                      session->c->id, stream->req->authority, stream->req->path,
+                      "h2_session(%s): submit %s%s -> %d (task %s)", 
+                      session->id, stream->req->authority, stream->req->path,
                       rv, task_id);
     }
+    else {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                      "h2_session(%s-%d): submit %s%s", 
+                      session->id, rv, stream->req->authority, stream->req->path);
+    }
+    
     if (rv > 0) {
         stream->id = rv;
         stream->state = H2_STREAM_ST_OPEN;
+        h2_iq_add(session->streams, stream->id, NULL, NULL);
+        dispatch_event(session, H2_PROXYS_EV_STREAM_SUBMITTED, rv, NULL);
         
-        return stream_loop(stream);
+        return APR_SUCCESS;
     }
     return APR_EGENERAL;
 }
 
+static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block)
+{
+    apr_status_t status;
+    status = ap_get_brigade(session->c->input_filters, session->input, 
+                            AP_MODE_READBYTES, 
+                            block? APR_BLOCK_READ : APR_NONBLOCK_READ, 
+                            APR_BUCKET_BUFF_SIZE);
+    if (status == APR_SUCCESS) {
+        if (APR_BRIGADE_EMPTY(session->input)) {
+            status = APR_EAGAIN;
+        }
+        else {
+            feed_brigade(session, session->input);
+        }
+    }
+    else if (!APR_STATUS_IS_EAGAIN(status)) {
+        dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, 0, NULL);
+    }
+    return status;
+}
+
+apr_status_t h2_proxy_session_submit(h2_proxy_session *session, 
+                                     const char *url, request_rec *r)
+{
+    h2_proxy_stream *stream;
+    apr_status_t status;
+    
+    status = open_stream(session, url, r, &stream);
+    if (status == OK) {
+        ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
+                      "process stream(%d): %s %s%s, original: %s", 
+                      stream->id, stream->req->method, 
+                      stream->req->authority, stream->req->path, 
+                      r->the_request);
+        status = submit_stream(session, stream);
+    }
+    return status;
+}
+
+static apr_status_t check_suspended(h2_proxy_session *session)
+{
+    h2_proxy_stream *stream;
+    int i, stream_id;
+    apr_status_t status;
+    
+    for (i = 0; i < session->suspended->nelts; ++i) {
+        stream_id = session->suspended->elts[i];
+        stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
+        if (stream) {
+            status = ap_get_brigade(stream->r->input_filters, stream->input,
+                                    AP_MODE_READBYTES, APR_NONBLOCK_READ,
+                                    APR_BUCKET_BUFF_SIZE);
+            if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(stream->input)) {
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, 
+                              "h2_proxy_stream(%s-%d): resuming", 
+                              session->id, stream_id);
+                stream->suspended = 0;
+                h2_iq_remove(session->suspended, stream_id);
+                nghttp2_session_resume_data(session->ngh2, stream_id);
+                dispatch_event(session, H2_PROXYS_EV_STREAM_RESUMED, 0, NULL);
+                check_suspended(session);
+                return APR_SUCCESS;
+            }
+            else if (status != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(status)) {
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c, 
+                              "h2_proxy_stream(%s-%d): check input", 
+                              session->id, stream_id);
+                h2_iq_remove(session->suspended, stream_id);
+                dispatch_event(session, H2_PROXYS_EV_STREAM_RESUMED, 0, NULL);
+                check_suspended(session);
+                return APR_SUCCESS;
+            }
+        }
+        else {
+            /* gone? */
+            h2_iq_remove(session->suspended, stream_id);
+            check_suspended(session);
+            return APR_SUCCESS;
+        }
+    }
+    return APR_EAGAIN;
+}
+
+static apr_status_t session_shutdown(h2_proxy_session *session, int reason, 
+                                     const char *msg)
+{
+    apr_status_t status = APR_SUCCESS;
+    const char *err = msg;
+    
+    AP_DEBUG_ASSERT(session);
+    if (!err && reason) {
+        err = nghttp2_strerror(reason);
+    }
+    nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 0, 
+                          reason, (uint8_t*)err, err? strlen(err):0);
+    status = nghttp2_session_send(session->ngh2);
+    dispatch_event(session, H2_PROXYS_EV_LOCAL_GOAWAY, reason, err);
+    return status;
+}
+
+
+static const char *StateNames[] = {
+    "INIT",      /* H2_PROXYS_ST_INIT */
+    "DONE",      /* H2_PROXYS_ST_DONE */
+    "IDLE",      /* H2_PROXYS_ST_IDLE */
+    "BUSY",      /* H2_PROXYS_ST_BUSY */
+    "WAIT",      /* H2_PROXYS_ST_WAIT */
+    "LSHUTDOWN", /* H2_PROXYS_ST_LOCAL_SHUTDOWN */
+    "RSHUTDOWN", /* H2_PROXYS_ST_REMOTE_SHUTDOWN */
+};
+
+static const char *state_name(h2_proxys_state state)
+{
+    if (state >= (sizeof(StateNames)/sizeof(StateNames[0]))) {
+        return "unknown";
+    }
+    return StateNames[state];
+}
+
+static int is_accepting_streams(h2_proxy_session *session)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_IDLE:
+        case H2_PROXYS_ST_BUSY:
+        case H2_PROXYS_ST_WAIT:
+            return 1;
+        default:
+            return 0;
+    }
+}
+
+static void transit(h2_proxy_session *session, const char *action, 
+                    h2_proxys_state nstate)
+{
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO()
+                  "h2_proxy_session(%s): transit [%s] -- %s --> [%s]", session->id,
+                  state_name(session->state), action, state_name(nstate));
+    session->state = nstate;
+}
+
+static void ev_init(h2_proxy_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_INIT:
+            if (h2_iq_empty(session->streams)) {
+                transit(session, "init", H2_PROXYS_ST_IDLE);
+            }
+            else {
+                transit(session, "init", H2_PROXYS_ST_BUSY);
+            }
+            break;
+
+        default:
+            /* nop */
+            break;
+    }
+}
+
+static void ev_local_goaway(h2_proxy_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_LOCAL_SHUTDOWN:
+            /* already did that? */
+            break;
+        case H2_PROXYS_ST_IDLE:
+        case H2_PROXYS_ST_REMOTE_SHUTDOWN:
+            /* all done */
+            transit(session, "local goaway", H2_PROXYS_ST_DONE);
+            break;
+        default:
+            transit(session, "local goaway", H2_PROXYS_ST_LOCAL_SHUTDOWN);
+            break;
+    }
+}
+
+static void ev_remote_goaway(h2_proxy_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_REMOTE_SHUTDOWN:
+            /* already received that? */
+            break;
+        case H2_PROXYS_ST_IDLE:
+        case H2_PROXYS_ST_LOCAL_SHUTDOWN:
+            /* all done */
+            transit(session, "remote goaway", H2_PROXYS_ST_DONE);
+            break;
+        default:
+            transit(session, "remote goaway", H2_PROXYS_ST_REMOTE_SHUTDOWN);
+            break;
+    }
+}
+
+static void ev_conn_error(h2_proxy_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_INIT:
+        case H2_PROXYS_ST_DONE:
+        case H2_PROXYS_ST_LOCAL_SHUTDOWN:
+            /* just leave */
+            transit(session, "conn error", H2_PROXYS_ST_DONE);
+            break;
+        
+        default:
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+                          "h2_session(%s): conn error -> shutdown", session->id);
+            session_shutdown(session, arg, msg);
+            break;
+    }
+}
+
+static void ev_proto_error(h2_proxy_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_DONE:
+        case H2_PROXYS_ST_LOCAL_SHUTDOWN:
+            /* just leave */
+            transit(session, "proto error", H2_PROXYS_ST_DONE);
+            break;
+        
+        default:
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+                          "h2_session(%s): proto error -> shutdown", session->id);
+            session_shutdown(session, arg, msg);
+            break;
+    }
+}
+
+static void ev_conn_timeout(h2_proxy_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_LOCAL_SHUTDOWN:
+            transit(session, "conn timeout", H2_PROXYS_ST_DONE);
+            break;
+        default:
+            session_shutdown(session, arg, msg);
+            transit(session, "conn timeout", H2_PROXYS_ST_DONE);
+            break;
+    }
+}
+
+static void ev_no_io(h2_proxy_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_BUSY:
+        case H2_PROXYS_ST_LOCAL_SHUTDOWN:
+        case H2_PROXYS_ST_REMOTE_SHUTDOWN:
+            /* nothing for input and output to do. If we remain
+             * in this state, we go into a tight loop and suck up
+             * CPU cycles. Ideally, we'd like to do a blocking read, but that
+             * is not possible if we have scheduled tasks and wait
+             * for them to produce something. */
+            if (h2_iq_empty(session->streams)) {
+                if (!is_accepting_streams(session)) {
+                    /* We are no longer accepting new streams and have
+                     * finished processing existing ones. Time to leave. */
+                    session_shutdown(session, arg, msg);
+                    transit(session, "no io", H2_PROXYS_ST_DONE);
+                }
+                else {
+                    /* When we have no streams, no task event are possible,
+                     * switch to blocking reads */
+                    transit(session, "no io", H2_PROXYS_ST_IDLE);
+                }
+            }
+            else {
+                /* Unable to do blocking reads, as we wait on events from
+                 * task processing in other threads. Do a busy wait with
+                 * backoff timer. */
+                transit(session, "no io", H2_PROXYS_ST_WAIT);
+            }
+            break;
+        default:
+            /* nop */
+            break;
+    }
+}
+
+static void ev_stream_submitted(h2_proxy_session *session, int stream_id, 
+                                const char *msg)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_IDLE:
+        case H2_PROXYS_ST_WAIT:
+            transit(session, "stream submitted", H2_PROXYS_ST_BUSY);
+            break;
+        default:
+            /* nop */
+            break;
+    }
+}
+
+static void ev_stream_done(h2_proxy_session *session, int stream_id, 
+                           const char *msg)
+{
+    h2_proxy_stream *stream;
+    
+    stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
+    if (stream) {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                      "h2_proxy_sesssion(%s): stream(%d) closed", 
+                      session->id, stream_id);
+        if (!stream->data_received) {
+            /* last chance to manipulate response headers.
+             * after this, only trailers */
+            stream->data_received = 1;
+        }
+        stream->state = H2_STREAM_ST_CLOSED;
+        h2_iq_remove(session->streams, stream_id);
+        h2_iq_remove(session->suspended, stream_id);
+        if (session->done) {
+            session->done(session, stream->r);
+        }
+    }
+    
+    switch (session->state) {
+        default:
+            /* nop */
+            break;
+    }
+}
+
+static void ev_stream_resumed(h2_proxy_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_WAIT:
+            transit(session, "stream resumed", H2_PROXYS_ST_BUSY);
+            break;
+        default:
+            /* nop */
+            break;
+    }
+}
+
+static void ev_data_read(h2_proxy_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_IDLE:
+        case H2_PROXYS_ST_WAIT:
+            transit(session, "data read", H2_PROXYS_ST_BUSY);
+            break;
+            /* fall through */
+        default:
+            /* nop */
+            break;
+    }
+}
+
+static void ev_ngh2_done(h2_proxy_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_DONE:
+            /* nop */
+            break;
+        default:
+            transit(session, "nghttp2 done", H2_PROXYS_ST_DONE);
+            break;
+    }
+}
+
+static void ev_pre_close(h2_proxy_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_DONE:
+        case H2_PROXYS_ST_LOCAL_SHUTDOWN:
+            /* nop */
+            break;
+        default:
+            session_shutdown(session, arg, msg);
+            break;
+    }
+}
+
+static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev, 
+                           int arg, const char *msg)
+{
+    switch (ev) {
+        case H2_PROXYS_EV_INIT:
+            ev_init(session, arg, msg);
+            break;            
+        case H2_PROXYS_EV_LOCAL_GOAWAY:
+            ev_local_goaway(session, arg, msg);
+            break;
+        case H2_PROXYS_EV_REMOTE_GOAWAY:
+            ev_remote_goaway(session, arg, msg);
+            break;
+        case H2_PROXYS_EV_CONN_ERROR:
+            ev_conn_error(session, arg, msg);
+            break;
+        case H2_PROXYS_EV_PROTO_ERROR:
+            ev_proto_error(session, arg, msg);
+            break;
+        case H2_PROXYS_EV_CONN_TIMEOUT:
+            ev_conn_timeout(session, arg, msg);
+            break;
+        case H2_PROXYS_EV_NO_IO:
+            ev_no_io(session, arg, msg);
+            break;
+        case H2_PROXYS_EV_STREAM_SUBMITTED:
+            ev_stream_submitted(session, arg, msg);
+            break;
+        case H2_PROXYS_EV_STREAM_DONE:
+            ev_stream_done(session, arg, msg);
+            break;
+        case H2_PROXYS_EV_STREAM_RESUMED:
+            ev_stream_resumed(session, arg, msg);
+            break;
+        case H2_PROXYS_EV_DATA_READ:
+            ev_data_read(session, arg, msg);
+            break;
+        case H2_PROXYS_EV_NGH2_DONE:
+            ev_ngh2_done(session, arg, msg);
+            break;
+        case H2_PROXYS_EV_PRE_CLOSE:
+            ev_pre_close(session, arg, msg);
+            break;
+        default:
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+                          "h2_session(%s): unknown event %d", 
+                          session->id, ev);
+            break;
+    }
+}
+
+apr_status_t h2_proxy_session_process(h2_proxy_session *session)
+{
+    apr_status_t status;
+    int have_written = 0, have_read = 0;
+
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
+                  "h2_session(%s): process", session->id);
+           
+    switch (session->state) {
+        case H2_PROXYS_ST_INIT:
+            status = session_start(session);
+            if (status == APR_SUCCESS) {
+                dispatch_event(session, H2_PROXYS_EV_INIT, 0, NULL);
+            }
+            else {
+                dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, 0, NULL);
+            }
+            break;
+            
+        case H2_PROXYS_ST_BUSY:
+        case H2_PROXYS_ST_LOCAL_SHUTDOWN:
+        case H2_PROXYS_ST_REMOTE_SHUTDOWN:
+            while (nghttp2_session_want_write(session->ngh2)) {
+                int rv = nghttp2_session_send(session->ngh2);
+                if (rv < 0 && nghttp2_is_fatal(rv)) {
+                    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
+                                  "h2_session(%s): write, rv=%d", session->id, rv);
+                    dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, 0, NULL);
+                    break;
+                }
+                have_written = 1;
+            }
+            
+            if (nghttp2_session_want_read(session->ngh2)) {
+                if (h2_proxy_session_read(session, 0) == APR_SUCCESS) {
+                    have_read = 1;
+                }
+            }
+            
+            if (!have_written && !have_read 
+                && !nghttp2_session_want_write(session->ngh2)) {
+                dispatch_event(session, H2_PROXYS_EV_NO_IO, 0, NULL);
+            }
+            break;
+            
+        case H2_PROXYS_ST_WAIT:
+            if (check_suspended(session) == APR_EAGAIN) {
+                /* no stream has become resumed. Do a blocking read with
+                 * ever increasing timeouts... */
+                if (h2_proxy_session_read(session, 0) == APR_SUCCESS) {
+                    dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL);
+                }
+            }
+            break;
+            
+        case H2_PROXYS_ST_IDLE:
+            return APR_SUCCESS;
+
+        case H2_PROXYS_ST_DONE:
+            return APR_SUCCESS;
+            
+        default:
+            ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, session->c,
+                          APLOGNO()"h2_session(%s): unknown state %d", 
+                          session->id, session->state);
+            dispatch_event(session, H2_PROXYS_EV_PROTO_ERROR, 0, NULL);
+            break;
+    }
+
+
+    if (!nghttp2_session_want_read(session->ngh2)
+        && !nghttp2_session_want_write(session->ngh2)) {
+        dispatch_event(session, H2_PROXYS_EV_NGH2_DONE, 0, NULL);
+    }
+    
+    return APR_EAGAIN;
+}
+

Modified: httpd/httpd/trunk/modules/http2/h2_proxy_session.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_proxy_session.h?rev=1731096&r1=1731095&r2=1731096&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_proxy_session.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_proxy_session.h Thu Feb 18 17:02:02 2016
@@ -20,50 +20,72 @@
 
 #include <nghttp2/nghttp2.h>
 
-typedef struct h2_proxy_session {
+struct h2_int_queue;
+
+typedef enum {
+    H2_PROXYS_ST_INIT,             /* send initial SETTINGS, etc. */
+    H2_PROXYS_ST_DONE,             /* finished, connection close */
+    H2_PROXYS_ST_IDLE,             /* no streams to process */
+    H2_PROXYS_ST_BUSY,             /* read/write without stop */
+    H2_PROXYS_ST_WAIT,             /* waiting for tasks reporting back */
+    H2_PROXYS_ST_LOCAL_SHUTDOWN,   /* we announced GOAWAY */
+    H2_PROXYS_ST_REMOTE_SHUTDOWN,  /* client announced GOAWAY */
+} h2_proxys_state;
+
+typedef enum {
+    H2_PROXYS_EV_INIT,             /* session was initialized */
+    H2_PROXYS_EV_LOCAL_GOAWAY,     /* we send a GOAWAY */
+    H2_PROXYS_EV_REMOTE_GOAWAY,    /* remote send us a GOAWAY */
+    H2_PROXYS_EV_CONN_ERROR,       /* connection error */
+    H2_PROXYS_EV_PROTO_ERROR,      /* protocol error */
+    H2_PROXYS_EV_CONN_TIMEOUT,     /* connection timeout */
+    H2_PROXYS_EV_NO_IO,            /* nothing has been read or written */
+    H2_PROXYS_EV_STREAM_SUBMITTED, /* stream has been submitted */
+    H2_PROXYS_EV_STREAM_DONE,      /* stream has been finished */
+    H2_PROXYS_EV_STREAM_RESUMED,   /* stream signalled availability of headers/data */
+    H2_PROXYS_EV_DATA_READ,        /* connection data has been read */
+    H2_PROXYS_EV_NGH2_DONE,        /* nghttp2 wants neither read nor write anything */
+    H2_PROXYS_EV_PRE_CLOSE,        /* connection will close after this */
+} h2_proxys_event_t;
+
+
+typedef struct h2_proxy_session h2_proxy_session;
+typedef void h2_proxy_request_done(h2_proxy_session *s, request_rec *r);
+
+struct h2_proxy_session {
+    const char *id;
     conn_rec *c;
     proxy_conn_rec *p_conn;
     proxy_server_conf *conf;
     apr_pool_t *pool;
     nghttp2_session *ngh2;   /* the nghttp2 session itself */
     
+    h2_proxy_request_done *done;
+    void *user_data;
+    
     int window_bits_default;
     int window_bits_connection;
 
-    unsigned int goaway_recvd : 1;
-    unsigned int goaway_sent : 1;
-    
+    h2_proxys_state state;
+
+    struct h2_int_queue *streams;
+    struct h2_int_queue *suspended;
+    apr_size_t remote_max_concurrent;
     int max_stream_recv;
     
     apr_bucket_brigade *input;
     apr_bucket_brigade *output;
-} h2_proxy_session;
-
-typedef struct h2_proxy_stream {
-    int id;
-    apr_pool_t *pool;
-    h2_proxy_session *session;
-
-    const char *url;
-    request_rec *r;
-    h2_request *req;
-
-    h2_stream_state_t state;
-    unsigned int data_received : 1;
-
-    apr_bucket_brigade *input;
-    apr_bucket_brigade *output;
-    
-    apr_table_t *saves;
-} h2_proxy_stream;
-
+};
 
-h2_proxy_session *h2_proxy_session_setup(request_rec *r, proxy_conn_rec *p_connm,
-                                         proxy_server_conf *conf);
+h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
+                                         proxy_server_conf *conf,
+                                         h2_proxy_request_done *done);
+
+apr_status_t h2_proxy_session_submit(h2_proxy_session *s, const char *url,
+                                     request_rec *r);
+                                     
+apr_status_t h2_proxy_session_process(h2_proxy_session *s);
 
-apr_status_t h2_proxy_session_open_stream(h2_proxy_session *s, const char *url,
-                                          request_rec *r, h2_proxy_stream **pstream);
-apr_status_t h2_proxy_stream_process(h2_proxy_stream *stream);
 
 #define H2_PROXY_REQ_URL_NOTE   "h2-proxy-req-url"
 

Modified: httpd/httpd/trunk/modules/http2/h2_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_session.c?rev=1731096&r1=1731095&r2=1731096&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_session.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_session.c Thu Feb 18 17:02:02 2016
@@ -103,8 +103,6 @@ h2_stream *h2_session_open_stream(h2_ses
     return stream;
 }
 
-#ifdef H2_NG2_STREAM_API
-
 /**
  * Determine the importance of streams when scheduling tasks.
  * - if both stream depend on the same one, compare weights
@@ -158,20 +156,6 @@ static int stream_pri_cmp(int sid1, int
     return spri_cmp(sid1, s1, sid2, s2, session);
 }
 
-#else /* ifdef H2_NG2_STREAM_API */
-
-/* In absence of nghttp2_stream API, which gives information about
- * priorities since nghttp2 1.3.x, we just sort the streams by
- * their identifier, aka. order of arrival.
- */
-static int stream_pri_cmp(int sid1, int sid2, void *ctx)
-{
-    (void)ctx;
-    return sid1 - sid2;
-}
-
-#endif /* (ifdef else) H2_NG2_STREAM_API */
-
 static apr_status_t stream_schedule(h2_session *session,
                                     h2_stream *stream, int eos)
 {

Modified: httpd/httpd/trunk/modules/http2/h2_task.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task.c?rev=1731096&r1=1731095&r2=1731096&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_task.c Thu Feb 18 17:02:02 2016
@@ -298,6 +298,8 @@ apr_status_t h2_task_freeze(h2_task *tas
         task->frozen = 1;
         task->frozen_out = apr_brigade_create(c->pool, c->bucket_alloc);
         ap_add_output_filter("H2_RESPONSE_FREEZE", task, r, r->connection);
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->output->c, 
+                      "h2_task(%s), frozen", task->id);
     }
     return APR_SUCCESS;
 }
@@ -306,6 +308,8 @@ apr_status_t h2_task_thaw(h2_task *task)
 {
     if (task->frozen) {
         task->frozen = 0;
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->output->c, 
+                      "h2_task(%s), thawed", task->id);
     }
     return APR_SUCCESS;
 }




Mime
View raw message