httpd-cvs mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jor...@apache.org
Subject svn commit: r1769588 [12/17] - in /httpd/httpd/branches/2.4.x-openssl-1.1.0-compat: ./ docs/conf/ docs/manual/ docs/manual/howto/ docs/manual/mod/ docs/manual/platform/ docs/manual/programs/ docs/manual/rewrite/ include/ modules/ modules/aaa/ modules/a...
Date Mon, 14 Nov 2016 10:26:34 GMT
Modified: httpd/httpd/branches/2.4.x-openssl-1.1.0-compat/modules/http2/h2_from_h1.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x-openssl-1.1.0-compat/modules/http2/h2_from_h1.c?rev=1769588&r1=1769587&r2=1769588&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x-openssl-1.1.0-compat/modules/http2/h2_from_h1.c (original)
+++ httpd/httpd/branches/2.4.x-openssl-1.1.0-compat/modules/http2/h2_from_h1.c Mon Nov 14 10:26:31 2016
@@ -16,6 +16,7 @@
 #include <assert.h>
 #include <stdio.h>
 
+#include <apr_date.h>
 #include <apr_lib.h>
 #include <apr_strings.h>
 
@@ -28,190 +29,12 @@
 #include <util_time.h>
 
 #include "h2_private.h"
-#include "h2_response.h"
+#include "h2_headers.h"
 #include "h2_from_h1.h"
 #include "h2_task.h"
 #include "h2_util.h"
 
 
-static void set_state(h2_from_h1 *from_h1, h2_from_h1_state_t state);
-
-h2_from_h1 *h2_from_h1_create(int stream_id, apr_pool_t *pool)
-{
-    h2_from_h1 *from_h1 = apr_pcalloc(pool, sizeof(h2_from_h1));
-    if (from_h1) {
-        from_h1->stream_id = stream_id;
-        from_h1->pool = pool;
-        from_h1->state = H2_RESP_ST_STATUS_LINE;
-        from_h1->hlines = apr_array_make(pool, 10, sizeof(char *));
-    }
-    return from_h1;
-}
-
-static void set_state(h2_from_h1 *from_h1, h2_from_h1_state_t state)
-{
-    if (from_h1->state != state) {
-        from_h1->state = state;
-    }
-}
-
-h2_response *h2_from_h1_get_response(h2_from_h1 *from_h1)
-{
-    return from_h1->response;
-}
-
-static apr_status_t make_h2_headers(h2_from_h1 *from_h1, request_rec *r)
-{
-    from_h1->response = h2_response_create(from_h1->stream_id, 0,
-                                           from_h1->http_status, 
-                                           from_h1->hlines,
-                                           r->notes,
-                                           from_h1->pool);
-    from_h1->content_length = from_h1->response->content_length;
-    from_h1->chunked = r->chunked;
-    
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection, APLOGNO(03197)
-                  "h2_from_h1(%d): converted headers, content-length: %d"
-                  ", chunked=%d",
-                  from_h1->stream_id, (int)from_h1->content_length, 
-                  (int)from_h1->chunked);
-    
-    set_state(from_h1, ((from_h1->chunked || from_h1->content_length > 0)?
-                        H2_RESP_ST_BODY : H2_RESP_ST_DONE));
-    /* We are ready to be sent to the client */
-    return APR_SUCCESS;
-}
-
-static apr_status_t parse_header(h2_from_h1 *from_h1, ap_filter_t* f, 
-                                 char *line) {
-    (void)f;
-    
-    if (line[0] == ' ' || line[0] == '\t') {
-        char **plast;
-        /* continuation line from the header before this */
-        while (line[0] == ' ' || line[0] == '\t') {
-            ++line;
-        }
-        
-        plast = apr_array_pop(from_h1->hlines);
-        if (plast == NULL) {
-            /* not well formed */
-            return APR_EINVAL;
-        }
-        APR_ARRAY_PUSH(from_h1->hlines, const char*) = apr_psprintf(from_h1->pool, "%s %s", *plast, line);
-    }
-    else {
-        /* new header line */
-        APR_ARRAY_PUSH(from_h1->hlines, const char*) = apr_pstrdup(from_h1->pool, line);
-    }
-    return APR_SUCCESS;
-}
-
-static apr_status_t get_line(h2_from_h1 *from_h1, apr_bucket_brigade *bb,
-                             ap_filter_t* f, char *line, apr_size_t len)
-{
-    apr_status_t status;
-    if (!from_h1->bb) {
-        from_h1->bb = apr_brigade_create(from_h1->pool, f->c->bucket_alloc);
-    }
-    else {
-        apr_brigade_cleanup(from_h1->bb);                
-    }
-    status = apr_brigade_split_line(from_h1->bb, bb, 
-                                                 APR_BLOCK_READ, 
-                                                 HUGE_STRING_LEN);
-    if (status == APR_SUCCESS) {
-        --len;
-        status = apr_brigade_flatten(from_h1->bb, line, &len);
-        if (status == APR_SUCCESS) {
-            /* we assume a non-0 containing line and remove
-             * trailing crlf. */
-            line[len] = '\0';
-            if (len >= 2 && !strcmp(H2_CRLF, line + len - 2)) {
-                len -= 2;
-                line[len] = '\0';
-            }
-            
-            apr_brigade_cleanup(from_h1->bb);
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
-                          "h2_from_h1(%d): read line: %s",
-                          from_h1->stream_id, line);
-        }
-    }
-    return status;
-}
-
-apr_status_t h2_from_h1_read_response(h2_from_h1 *from_h1, ap_filter_t* f,
-                                      apr_bucket_brigade* bb)
-{
-    apr_status_t status = APR_SUCCESS;
-    char line[HUGE_STRING_LEN];
-    
-    if ((from_h1->state == H2_RESP_ST_BODY) 
-        || (from_h1->state == H2_RESP_ST_DONE)) {
-        if (from_h1->chunked) {
-            /* The httpd core HTTP_HEADER filter has or will install the 
-             * "CHUNK" output transcode filter, which appears further down 
-             * the filter chain. We do not want it for HTTP/2.
-             * Once we successfully deinstalled it, this filter has no
-             * further function and we remove it.
-             */
-            status = ap_remove_output_filter_byhandle(f->r->output_filters, 
-                                                      "CHUNK");
-            if (status == APR_SUCCESS) {
-                ap_remove_output_filter(f);
-            }
-        }
-        
-        return ap_pass_brigade(f->next, bb);
-    }
-    
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
-                  "h2_from_h1(%d): read_response", from_h1->stream_id);
-    
-    while (!APR_BRIGADE_EMPTY(bb) && status == APR_SUCCESS) {
-        
-        switch (from_h1->state) {
-                
-            case H2_RESP_ST_STATUS_LINE:
-            case H2_RESP_ST_HEADERS:
-                status = get_line(from_h1, bb, f, line, sizeof(line));
-                if (status != APR_SUCCESS) {
-                    return status;
-                }
-                if (from_h1->state == H2_RESP_ST_STATUS_LINE) {
-                    /* instead of parsing, just take it directly */
-                    from_h1->http_status = f->r->status;
-                    from_h1->state = H2_RESP_ST_HEADERS;
-                }
-                else if (line[0] == '\0') {
-                    /* end of headers, create the h2_response and
-                     * pass the rest of the brigade down the filter
-                     * chain.
-                     */
-                    status = make_h2_headers(from_h1, f->r);
-                    if (from_h1->bb) {
-                        apr_brigade_destroy(from_h1->bb);
-                        from_h1->bb = NULL;
-                    }
-                    if (!APR_BRIGADE_EMPTY(bb)) {
-                        return ap_pass_brigade(f->next, bb);
-                    }
-                }
-                else {
-                    status = parse_header(from_h1, f, line);
-                }
-                break;
-                
-            default:
-                return ap_pass_brigade(f->next, bb);
-        }
-        
-    }
-    
-    return status;
-}
-
 /* This routine is called by apr_table_do and merges all instances of
  * the passed field values into a single array that will be further
  * processed by some later routine.  Originally intended to help split
@@ -291,8 +114,8 @@ static void fix_vary(request_rec *r)
     }
 }
 
-void h2_from_h1_set_basic_http_header(apr_table_t *headers, request_rec *r,
-                                      apr_pool_t *pool)
+static void set_basic_http_header(apr_table_t *headers, request_rec *r,
+                                  apr_pool_t *pool)
 {
     char *date = NULL;
     const char *proxy_date = NULL;
@@ -345,7 +168,7 @@ static int copy_header(void *ctx, const
     return 1;
 }
 
-static h2_response *create_response(h2_from_h1 *from_h1, request_rec *r)
+static h2_headers *create_response(h2_task *task, request_rec *r)
 {
     const char *clheader;
     const char *ctype;
@@ -450,7 +273,7 @@ static h2_response *create_response(h2_f
     
     headers = apr_table_make(r->pool, 10);
     
-    h2_from_h1_set_basic_http_header(headers, r, r->pool);
+    set_basic_http_header(headers, r, r->pool);
     if (r->status == HTTP_NOT_MODIFIED) {
         apr_table_do((int (*)(void *, const char *, const char *)) copy_header,
                      (void *) headers, r->headers_out,
@@ -471,114 +294,541 @@ static h2_response *create_response(h2_f
                      (void *) headers, r->headers_out, NULL);
     }
     
-    return h2_response_rcreate(from_h1->stream_id, r, headers, r->pool);
+    return h2_headers_rcreate(r, r->status, headers, r->pool);
+}
+
+typedef enum {
+    H2_RP_STATUS_LINE,
+    H2_RP_HEADER_LINE,
+    H2_RP_DONE
+} h2_rp_state_t;
+
+typedef struct h2_response_parser {
+    h2_rp_state_t state;
+    h2_task *task;
+    int http_status;
+    apr_array_header_t *hlines;
+    apr_bucket_brigade *tmp;
+} h2_response_parser;
+
+static apr_status_t parse_header(h2_response_parser *parser, char *line) {
+    const char *hline;
+    if (line[0] == ' ' || line[0] == '\t') {
+        char **plast;
+        /* continuation line from the header before this */
+        while (line[0] == ' ' || line[0] == '\t') {
+            ++line;
+        }
+        
+        plast = apr_array_pop(parser->hlines);
+        if (plast == NULL) {
+            /* not well formed */
+            return APR_EINVAL;
+        }
+        hline = apr_psprintf(parser->task->pool, "%s %s", *plast, line);
+    }
+    else {
+        /* new header line */
+        hline = apr_pstrdup(parser->task->pool, line);
+    }
+    APR_ARRAY_PUSH(parser->hlines, const char*) = hline; 
+    return APR_SUCCESS;
+}
+
+static apr_status_t get_line(h2_response_parser *parser, apr_bucket_brigade *bb, 
+                             char *line, apr_size_t len)
+{
+    h2_task *task = parser->task;
+    apr_status_t status;
+    
+    if (!parser->tmp) {
+        parser->tmp = apr_brigade_create(task->pool, task->c->bucket_alloc);
+    }
+    status = apr_brigade_split_line(parser->tmp, bb, APR_BLOCK_READ, 
+                                    HUGE_STRING_LEN);
+    if (status == APR_SUCCESS) {
+        --len;
+        status = apr_brigade_flatten(parser->tmp, line, &len);
+        if (status == APR_SUCCESS) {
+            /* we assume a non-0 containing line and remove trailing crlf. */
+            line[len] = '\0';
+            if (len >= 2 && !strcmp(H2_CRLF, line + len - 2)) {
+                len -= 2;
+                line[len] = '\0';
+                apr_brigade_cleanup(parser->tmp);
+                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c,
+                              "h2_task(%s): read response line: %s", 
+                              task->id, line);
+            }
+            else {
+                /* this does not look like a complete line yet */
+                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c,
+                              "h2_task(%s): read response, incomplete line: %s", 
+                              task->id, line);
+                return APR_EAGAIN;
+            }
+        }
+    }
+    apr_brigade_cleanup(parser->tmp);
+    return status;
 }
 
-apr_status_t h2_response_output_filter(ap_filter_t *f, apr_bucket_brigade *bb)
+static apr_table_t *make_table(h2_response_parser *parser)
+{
+    h2_task *task = parser->task;
+    apr_array_header_t *hlines = parser->hlines;
+    if (hlines) {
+        apr_table_t *headers = apr_table_make(task->pool, hlines->nelts);        
+        int i;
+        
+        for (i = 0; i < hlines->nelts; ++i) {
+            char *hline = ((char **)hlines->elts)[i];
+            char *sep = ap_strchr(hline, ':');
+            if (!sep) {
+                ap_log_cerror(APLOG_MARK, APLOG_WARNING, APR_EINVAL, task->c,
+                              APLOGNO(02955) "h2_task(%s): invalid header[%d] '%s'",
+                              task->id, i, (char*)hline);
+                /* not valid format, abort */
+                return NULL;
+            }
+            (*sep++) = '\0';
+            while (*sep == ' ' || *sep == '\t') {
+                ++sep;
+            }
+            
+            if (!h2_util_ignore_header(hline)) {
+                apr_table_merge(headers, hline, sep);
+            }
+        }
+        return headers;
+    }
+    else {
+        return apr_table_make(task->pool, 0);        
+    }
+}
+
+static apr_status_t pass_response(h2_task *task, ap_filter_t *f, 
+                                  h2_response_parser *parser) 
+{
+    apr_bucket *b;
+    apr_status_t status;
+    
+    h2_headers *response = h2_headers_create(parser->http_status, 
+                                             make_table(parser),
+                                             NULL, task->pool);
+    apr_brigade_cleanup(parser->tmp);
+    b = h2_bucket_headers_create(task->c->bucket_alloc, response);
+    APR_BRIGADE_INSERT_TAIL(parser->tmp, b);
+    b = apr_bucket_flush_create(task->c->bucket_alloc);
+    APR_BRIGADE_INSERT_TAIL(parser->tmp, b);                      
+    status = ap_pass_brigade(f->next, parser->tmp);
+    apr_brigade_cleanup(parser->tmp);
+    
+    /* reset parser for possible next response */
+    parser->state = H2_RP_STATUS_LINE;
+    apr_array_clear(parser->hlines);
+
+    if (response->status >= 200) {
+        task->output.sent_response = 1;
+    }
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c, 
+                  APLOGNO(03197) "h2_task(%s): passed response %d", 
+                  task->id, response->status);
+    return status;
+}
+
+static apr_status_t parse_status(h2_task *task, char *line)
+{
+    h2_response_parser *parser = task->output.rparser;
+    int sindex = (apr_date_checkmask(line, "HTTP/#.# ###*")? 9 : 
+                  (apr_date_checkmask(line, "HTTP/# ###*")? 7 : 0));
+    if (sindex > 0) {
+        int k = sindex + 3;
+        char keepchar = line[k];
+        line[k] = '\0';
+        parser->http_status = atoi(&line[sindex]);
+        line[k] = keepchar;
+        parser->state = H2_RP_HEADER_LINE;
+        
+        return APR_SUCCESS;
+    }
+    ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, task->c, APLOGNO(03467)
+                  "h2_task(%s): unable to parse status line: %s", 
+                  task->id, line);
+    return APR_EINVAL;
+}
+
+apr_status_t h2_from_h1_parse_response(h2_task *task, ap_filter_t *f, 
+                                       apr_bucket_brigade *bb)
+{
+    h2_response_parser *parser = task->output.rparser;
+    char line[HUGE_STRING_LEN];
+    apr_status_t status = APR_SUCCESS;
+
+    if (!parser) {
+        parser = apr_pcalloc(task->pool, sizeof(*parser));
+        parser->task = task;
+        parser->state = H2_RP_STATUS_LINE;
+        parser->hlines = apr_array_make(task->pool, 10, sizeof(char *));
+        task->output.rparser = parser;
+    }
+    
+    while (!APR_BRIGADE_EMPTY(bb) && status == APR_SUCCESS) {
+        switch (parser->state) {
+            case H2_RP_STATUS_LINE:
+            case H2_RP_HEADER_LINE:
+                status = get_line(parser, bb, line, sizeof(line));
+                if (status == APR_EAGAIN) {
+                    /* need more data */
+                    return APR_SUCCESS;
+                }
+                else if (status != APR_SUCCESS) {
+                    return status;
+                }
+                if (parser->state == H2_RP_STATUS_LINE) {
+                    /* instead of parsing, just take it directly */
+                    status = parse_status(task, line);
+                }
+                else if (line[0] == '\0') {
+                    /* end of headers, pass response onward */
+                    
+                    return pass_response(task, f, parser);
+                }
+                else {
+                    status = parse_header(parser, line);
+                }
+                break;
+                
+            default:
+                return status;
+        }
+    }
+    return status;
+}
+
+apr_status_t h2_filter_headers_out(ap_filter_t *f, apr_bucket_brigade *bb)
 {
     h2_task *task = f->ctx;
-    h2_from_h1 *from_h1 = task->output.from_h1;
     request_rec *r = f->r;
-    apr_bucket *b;
+    apr_bucket *b, *bresp, *body_bucket = NULL, *next;
     ap_bucket_error *eb = NULL;
+    h2_headers *response = NULL;
 
-    AP_DEBUG_ASSERT(from_h1 != NULL);
-    
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
-                  "h2_from_h1(%d): output_filter called", from_h1->stream_id);
+                  "h2_task(%s): output_filter called", task->id);
     
-    if (r->header_only && from_h1->response) {
-        /* throw away any data after we have compiled the response */
-        apr_brigade_cleanup(bb);
-        return OK;
-    }
-    
-    for (b = APR_BRIGADE_FIRST(bb);
-         b != APR_BRIGADE_SENTINEL(bb);
-         b = APR_BUCKET_NEXT(b))
-    {
-        if (AP_BUCKET_IS_ERROR(b) && !eb) {
-            eb = b->data;
-            continue;
-        }
-        /*
-         * If we see an EOC bucket it is a signal that we should get out
-         * of the way doing nothing.
-         */
-        if (AP_BUCKET_IS_EOC(b)) {
-            ap_remove_output_filter(f);
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, f->c,
-                          "h2_from_h1(%d): eoc bucket passed", 
-                          from_h1->stream_id);
-            return ap_pass_brigade(f->next, bb);
+    if (!task->output.sent_response) {
+        /* check, if we need to send the response now. Until we actually
+         * see a DATA bucket or some EOS/EOR, we do not do so. */
+        for (b = APR_BRIGADE_FIRST(bb);
+             b != APR_BRIGADE_SENTINEL(bb);
+             b = APR_BUCKET_NEXT(b))
+        {
+            if (AP_BUCKET_IS_ERROR(b) && !eb) {
+                eb = b->data;
+            }
+            else if (AP_BUCKET_IS_EOC(b)) {
+                /* If we see an EOC bucket it is a signal that we should get out
+                 * of the way doing nothing.
+                 */
+                ap_remove_output_filter(f);
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, f->c,
+                              "h2_task(%s): eoc bucket passed", task->id);
+                return ap_pass_brigade(f->next, bb);
+            }
+            else if (!H2_BUCKET_IS_HEADERS(b) && !APR_BUCKET_IS_FLUSH(b)) { 
+                body_bucket = b;
+                break;
+            }
+        }
+        
+        if (eb) {
+            int st = eb->status;
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03047)
+                          "h2_task(%s): err bucket status=%d", task->id, st);
+            /* throw everything away and replace it with the error response
+             * generated by ap_die() */
+            apr_brigade_cleanup(bb);
+            ap_die(st, r);
+            return AP_FILTER_ERROR;
+        }
+        
+        if (body_bucket) {
+            /* time to insert the response bucket before the body */
+            response = create_response(task, r);
+            if (response == NULL) {
+                ap_log_cerror(APLOG_MARK, APLOG_NOTICE, 0, f->c, APLOGNO(03048)
+                              "h2_task(%s): unable to create response", task->id);
+                return APR_ENOMEM;
+            }
+            
+            bresp = h2_bucket_headers_create(f->c->bucket_alloc, response);
+            APR_BUCKET_INSERT_BEFORE(body_bucket, bresp);
+            task->output.sent_response = 1;
+            r->sent_bodyct = 1;
         }
-    }
-    
-    if (eb) {
-        int st = eb->status;
-        apr_brigade_cleanup(bb);
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03047)
-                      "h2_from_h1(%d): err bucket status=%d", 
-                      from_h1->stream_id, st);
-        ap_die(st, r);
-        return AP_FILTER_ERROR;
-    }
-    
-    from_h1->response = create_response(from_h1, r);
-    if (from_h1->response == NULL) {
-        ap_log_cerror(APLOG_MARK, APLOG_NOTICE, 0, f->c, APLOGNO(03048)
-                      "h2_from_h1(%d): unable to create response", 
-                      from_h1->stream_id);
-        return APR_ENOMEM;
     }
     
     if (r->header_only) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
-                      "h2_from_h1(%d): header_only, cleanup output brigade", 
-                      from_h1->stream_id);
-        apr_brigade_cleanup(bb);
-        return OK;
+                      "h2_task(%s): header_only, cleanup output brigade", 
+                      task->id);
+        b = body_bucket? body_bucket : APR_BRIGADE_FIRST(bb);
+        while (b != APR_BRIGADE_SENTINEL(bb)) {
+            next = APR_BUCKET_NEXT(b);
+            if (APR_BUCKET_IS_EOS(b) || AP_BUCKET_IS_EOR(b)) {
+                break;
+            } 
+            APR_BUCKET_REMOVE(b);
+            apr_bucket_destroy(b);
+            b = next;
+        }
     }
-    
-    r->sent_bodyct = 1;         /* Whatever follows is real body stuff... */
-    
-    ap_remove_output_filter(f);
-    if (APLOGctrace1(f->c)) {
-        apr_off_t len = 0;
-        apr_brigade_length(bb, 0, &len);
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
-                      "h2_from_h1(%d): removed header filter, passing brigade "
-                      "len=%ld", from_h1->stream_id, (long)len);
+    else if (task->output.sent_response) {
+        /* lets get out of the way, our task is done */
+        ap_remove_output_filter(f);
     }
     return ap_pass_brigade(f->next, bb);
 }
 
-apr_status_t h2_response_trailers_filter(ap_filter_t *f, apr_bucket_brigade *bb)
+static void make_chunk(h2_task *task, apr_bucket_brigade *bb, 
+                       apr_bucket *first, apr_off_t chunk_len, 
+                       apr_bucket *tail)
+{
+    /* Surround the buckets [first, tail[ with new buckets carrying the
+     * HTTP/1.1 chunked encoding format. If tail is NULL, the chunk extends
+     * to the end of the brigade. */
+    char buffer[128];
+    apr_bucket *c;
+    int len;
+    
+    len = apr_snprintf(buffer, H2_ALEN(buffer), 
+                       "%"APR_UINT64_T_HEX_FMT"\r\n", (apr_uint64_t)chunk_len);
+    c = apr_bucket_heap_create(buffer, len, NULL, bb->bucket_alloc);
+    APR_BUCKET_INSERT_BEFORE(first, c);
+    c = apr_bucket_heap_create("\r\n", 2, NULL, bb->bucket_alloc);
+    if (tail) {
+        APR_BUCKET_INSERT_BEFORE(tail, c);
+    }
+    else {
+        APR_BRIGADE_INSERT_TAIL(bb, c);
+    }
+    task->input.chunked_total += chunk_len;
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, task->c,
+                  "h2_task(%s): added chunk %"APR_OFF_T_FMT", total %"
+                  APR_OFF_T_FMT, task->id, chunk_len, task->input.chunked_total);
+}
+
+static int ser_header(void *ctx, const char *name, const char *value) 
+{
+    apr_bucket_brigade *bb = ctx;
+    apr_brigade_printf(bb, NULL, NULL, "%s: %s\r\n", name, value);
+    return 1;
+}
+
+static apr_status_t read_and_chunk(ap_filter_t *f, h2_task *task,
+                                   apr_read_type_e block) {
+    request_rec *r = f->r;
+    apr_status_t status = APR_SUCCESS;
+    apr_bucket_brigade *bb = task->input.bbchunk;
+    
+    if (!bb) {
+        bb = apr_brigade_create(r->pool, f->c->bucket_alloc);
+        task->input.bbchunk = bb;
+    }
+    
+    if (APR_BRIGADE_EMPTY(bb)) {
+        apr_bucket *b, *next, *first_data = NULL;
+        apr_bucket_brigade *tmp;
+        apr_off_t bblen = 0;
+
+        /* get more data from the lower layer filters. Always do this
+         * in larger pieces, since we handle the read modes ourself. */
+        status = ap_get_brigade(f->next, bb, 
+                                AP_MODE_READBYTES, block, 32*1024);
+        if (status == APR_EOF) {
+            if (!task->input.eos) {
+                status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n");
+                task->input.eos = 1;
+                return APR_SUCCESS;
+            }
+            ap_remove_input_filter(f);
+            return status;
+            
+        }
+        else if (status != APR_SUCCESS) {
+            return status;
+        }
+
+        for (b = APR_BRIGADE_FIRST(bb); 
+             b != APR_BRIGADE_SENTINEL(bb) && !task->input.eos; 
+             b = next) {
+            next = APR_BUCKET_NEXT(b);
+            if (APR_BUCKET_IS_METADATA(b)) {
+                if (first_data) {
+                    make_chunk(task, bb, first_data, bblen, b);
+                    first_data = NULL;
+                }
+                
+                if (H2_BUCKET_IS_HEADERS(b)) {
+                    h2_headers *headers = h2_bucket_headers_get(b);
+                    
+                    ap_assert(headers);
+                    ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
+                                  "h2_task(%s): receiving trailers", task->id);
+                    tmp = apr_brigade_split_ex(bb, b, NULL);
+                    if (!apr_is_empty_table(headers->headers)) {
+                        status = apr_brigade_puts(bb, NULL, NULL, "0\r\n");
+                        apr_table_do(ser_header, bb, headers->headers, NULL);
+                        status = apr_brigade_puts(bb, NULL, NULL, "\r\n");
+                    }
+                    else {
+                        status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n");
+                    }
+                    r->trailers_in = apr_table_clone(r->pool, headers->headers);
+                    APR_BUCKET_REMOVE(b);
+                    apr_bucket_destroy(b);
+                    APR_BRIGADE_CONCAT(bb, tmp);
+                    apr_brigade_destroy(tmp);
+                    task->input.eos = 1;
+                }
+                else if (APR_BUCKET_IS_EOS(b)) {
+                    tmp = apr_brigade_split_ex(bb, b, NULL);
+                    status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n");
+                    APR_BRIGADE_CONCAT(bb, tmp);
+                    apr_brigade_destroy(tmp);
+                    task->input.eos = 1;
+                }
+            }
+            else if (b->length == 0) {
+                APR_BUCKET_REMOVE(b);
+                apr_bucket_destroy(b);
+            } 
+            else {
+                if (!first_data) {
+                    first_data = b;
+                    bblen = 0;
+                }
+                bblen += b->length;
+            }    
+        }
+        
+        if (first_data) {
+            make_chunk(task, bb, first_data, bblen, NULL);
+        }            
+    }
+    return status;
+}
+
+apr_status_t h2_filter_request_in(ap_filter_t* f,
+                                  apr_bucket_brigade* bb,
+                                  ap_input_mode_t mode,
+                                  apr_read_type_e block,
+                                  apr_off_t readbytes)
 {
     h2_task *task = f->ctx;
-    h2_from_h1 *from_h1 = task->output.from_h1;
     request_rec *r = f->r;
-    apr_bucket *b;
+    apr_status_t status = APR_SUCCESS;
+    apr_bucket *b, *next;
+
+    ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, f->r,
+                  "h2_task(%s): request filter, exp=%d", task->id, r->expecting_100);
+    if (!task->request->chunked) {
+        status = ap_get_brigade(f->next, bb, mode, block, readbytes);
+        /* pipe data through, just take care of trailers */
+        for (b = APR_BRIGADE_FIRST(bb); 
+             b != APR_BRIGADE_SENTINEL(bb); b = next) {
+            next = APR_BUCKET_NEXT(b);
+            if (H2_BUCKET_IS_HEADERS(b)) {
+                h2_headers *headers = h2_bucket_headers_get(b);
+                ap_assert(headers);
+                ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
+                              "h2_task(%s): receiving trailers", task->id);
+                r->trailers_in = apr_table_clone(r->pool, headers->headers);
+                APR_BUCKET_REMOVE(b);
+                apr_bucket_destroy(b);
+                ap_remove_input_filter(f);
+                break;
+            }
+        }
+        return status;
+    }
+
+    /* Things are more complicated. The standard HTTP input filter, which
+     * does a lot what we do not want to duplicate, also cares about chunked
+     * transfer encoding and trailers.
+     * We need to simulate chunked encoding for it to be happy.
+     */
+    if ((status = read_and_chunk(f, task, block)) != APR_SUCCESS) {
+        return status;
+    }
+    
+    if (mode == AP_MODE_EXHAUSTIVE) {
+        /* return all we have */
+        APR_BRIGADE_CONCAT(bb, task->input.bbchunk);
+    }
+    else if (mode == AP_MODE_READBYTES) {
+        status = h2_brigade_concat_length(bb, task->input.bbchunk, readbytes);
+    }
+    else if (mode == AP_MODE_SPECULATIVE) {
+        status = h2_brigade_copy_length(bb, task->input.bbchunk, readbytes);
+    }
+    else if (mode == AP_MODE_GETLINE) {
+        /* we are reading a single LF line, e.g. the HTTP headers. 
+         * this has the nasty side effect to split the bucket, even
+         * though it ends with CRLF and creates a 0 length bucket */
+        status = apr_brigade_split_line(bb, task->input.bbchunk, block, 
+                                        HUGE_STRING_LEN);
+        if (APLOGctrace1(f->c)) {
+            char buffer[1024];
+            apr_size_t len = sizeof(buffer)-1;
+            apr_brigade_flatten(bb, buffer, &len);
+            buffer[len] = 0;
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
+                          "h2_task(%s): getline: %s",
+                          task->id, buffer);
+        }
+    }
+    else {
+        /* Hmm, well. There is mode AP_MODE_EATCRLF, but we chose not
+         * to support it. Seems to work. */
+        ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOTIMPL, f->c,
+                      APLOGNO(02942) 
+                      "h2_task, unsupported READ mode %d", mode);
+        status = APR_ENOTIMPL;
+    }
+    
+    h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, "forwarding input", bb);
+    return status;
+}
+
+apr_status_t h2_filter_trailers_out(ap_filter_t *f, apr_bucket_brigade *bb)
+{
+    h2_task *task = f->ctx;
+    request_rec *r = f->r;
+    apr_bucket *b, *e;
  
-    if (from_h1 && from_h1->response) {
-        /* Detect the EOR bucket and forward any trailers that may have
-         * been set to our h2_response.
+    if (task && r) {
+        /* Detect the EOS/EOR bucket and forward any trailers that may have
+         * been set to our h2_headers.
          */
         for (b = APR_BRIGADE_FIRST(bb);
              b != APR_BRIGADE_SENTINEL(bb);
              b = APR_BUCKET_NEXT(b))
         {
-            if (AP_BUCKET_IS_EOR(b)) {
-                /* FIXME: need a better test case than this.
-                apr_table_setn(r->trailers_out, "X", "1"); */
-                if (r->trailers_out && !apr_is_empty_table(r->trailers_out)) {
-                    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03049)
-                                  "h2_from_h1(%d): trailers filter, saving trailers",
-                                  from_h1->stream_id);
-                    h2_response_set_trailers(from_h1->response,
-                                             apr_table_clone(from_h1->pool, 
-                                                             r->trailers_out));
-                }
+            if ((APR_BUCKET_IS_EOS(b) || AP_BUCKET_IS_EOR(b))
+                && r->trailers_out && !apr_is_empty_table(r->trailers_out)) {
+                h2_headers *headers;
+                apr_table_t *trailers;
+                
+                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03049)
+                              "h2_task(%s): sending trailers", task->id);
+                trailers = apr_table_clone(r->pool, r->trailers_out);
+                headers = h2_headers_rcreate(r, HTTP_OK, trailers, r->pool);
+                e = h2_bucket_headers_create(bb->bucket_alloc, headers);
+                APR_BUCKET_INSERT_BEFORE(b, e);
+                apr_table_clear(r->trailers_out);
+                ap_remove_output_filter(f);
                 break;
             }
         }     

Modified: httpd/httpd/branches/2.4.x-openssl-1.1.0-compat/modules/http2/h2_from_h1.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x-openssl-1.1.0-compat/modules/http2/h2_from_h1.h?rev=1769588&r1=1769587&r2=1769588&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x-openssl-1.1.0-compat/modules/http2/h2_from_h1.h (original)
+++ httpd/httpd/branches/2.4.x-openssl-1.1.0-compat/modules/http2/h2_from_h1.h Mon Nov 14 10:26:31 2016
@@ -30,46 +30,20 @@
  * we need to have all handlers and filters involved in request/response
  * processing, so this seems to be the way for now.
  */
+struct h2_headers;
+struct h2_task;
 
-typedef enum {
-    H2_RESP_ST_STATUS_LINE, /* parsing http/1 status line */
-    H2_RESP_ST_HEADERS,     /* parsing http/1 response headers */
-    H2_RESP_ST_BODY,        /* transferring response body */
-    H2_RESP_ST_DONE         /* complete response converted */
-} h2_from_h1_state_t;
+apr_status_t h2_from_h1_parse_response(struct h2_task *task, ap_filter_t *f, 
+                                       apr_bucket_brigade *bb);
 
-struct h2_response;
+apr_status_t h2_filter_headers_out(ap_filter_t *f, apr_bucket_brigade *bb);
 
-typedef struct h2_from_h1 h2_from_h1;
+apr_status_t h2_filter_request_in(ap_filter_t* f,
+                                  apr_bucket_brigade* brigade,
+                                  ap_input_mode_t mode,
+                                  apr_read_type_e block,
+                                  apr_off_t readbytes);
 
-struct h2_from_h1 {
-    int stream_id;
-    h2_from_h1_state_t state;
-    apr_pool_t *pool;
-    apr_bucket_brigade *bb;
-    
-    apr_off_t content_length;
-    int chunked;
-    
-    int http_status;
-    apr_array_header_t *hlines;
-    
-    struct h2_response *response;
-};
-
-
-h2_from_h1 *h2_from_h1_create(int stream_id, apr_pool_t *pool);
-
-apr_status_t h2_from_h1_read_response(h2_from_h1 *from_h1,
-                                      ap_filter_t* f, apr_bucket_brigade* bb);
-
-struct h2_response *h2_from_h1_get_response(h2_from_h1 *from_h1);
-
-apr_status_t h2_response_output_filter(ap_filter_t *f, apr_bucket_brigade *bb);
-
-apr_status_t h2_response_trailers_filter(ap_filter_t *f, apr_bucket_brigade *bb);
-
-void h2_from_h1_set_basic_http_header(apr_table_t *headers, request_rec *r,
-                                      apr_pool_t *pool);
+apr_status_t h2_filter_trailers_out(ap_filter_t *f, apr_bucket_brigade *bb);
 
 #endif /* defined(__mod_h2__h2_from_h1__) */

Modified: httpd/httpd/branches/2.4.x-openssl-1.1.0-compat/modules/http2/h2_h2.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x-openssl-1.1.0-compat/modules/http2/h2_h2.c?rev=1769588&r1=1769587&r2=1769588&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x-openssl-1.1.0-compat/modules/http2/h2_h2.c (original)
+++ httpd/httpd/branches/2.4.x-openssl-1.1.0-compat/modules/http2/h2_h2.c Mon Nov 14 10:26:31 2016
@@ -32,12 +32,15 @@
 #include "mod_http2.h"
 #include "h2_private.h"
 
+#include "h2_bucket_beam.h"
 #include "h2_stream.h"
 #include "h2_task.h"
 #include "h2_config.h"
 #include "h2_ctx.h"
 #include "h2_conn.h"
+#include "h2_filter.h"
 #include "h2_request.h"
+#include "h2_headers.h"
 #include "h2_session.h"
 #include "h2_util.h"
 #include "h2_h2.h"
@@ -569,6 +572,10 @@ void h2_h2_register_hooks(void)
      */
     ap_hook_post_read_request(h2_h2_post_read_req, NULL, NULL, APR_HOOK_REALLY_FIRST);
     ap_hook_fixups(h2_h2_late_fixups, NULL, NULL, APR_HOOK_LAST);
+
+    /* special bucket type transfer through a h2_bucket_beam */
+    h2_register_bucket_beamer(h2_bucket_headers_beam);
+    h2_register_bucket_beamer(h2_bucket_observer_beam);
 }
 
 int h2_h2_process_conn(conn_rec* c)
@@ -684,30 +691,23 @@ static int h2_h2_post_read_req(request_r
          * that we manipulate filters only once. */
         if (task && !task->filters_set) {
             ap_filter_t *f;
+            ap_log_rerror(APLOG_MARK, APLOG_TRACE3, 0, r, "adding request filters");
 
-            /* setup the correct output filters to process the response
-             * on the proper mod_http2 way. */
-            ap_log_rerror(APLOG_MARK, APLOG_TRACE3, 0, r, "adding task output filter");
-            if (task->ser_headers) {
-                ap_add_output_filter("H1_TO_H2_RESP", task, r, r->connection);
-            }
-            else {
-                /* replace the core http filter that formats response headers
-                 * in HTTP/1 with our own that collects status and headers */
-                ap_remove_output_filter_byhandle(r->output_filters, "HTTP_HEADER");
-                ap_add_output_filter("H2_RESPONSE", task, r, r->connection);
-            }
+            /* setup the correct filters to process the request for h2 */
+            ap_add_input_filter("H2_REQUEST", task, r, r->connection);
+            
+            /* replace the core http filter that formats response headers
+             * in HTTP/1 with our own that collects status and headers */
+            ap_remove_output_filter_byhandle(r->output_filters, "HTTP_HEADER");
+            ap_add_output_filter("H2_RESPONSE", task, r, r->connection);
             
-            /* trailers processing. Incoming trailers are added to this
-             * request via our h2 input filter, outgoing trailers
-             * in a special h2 out filter. */
             for (f = r->input_filters; f; f = f->next) {
-                if (!strcmp("H2_TO_H1", f->frec->name)) {
+                if (!strcmp("H2_SLAVE_IN", f->frec->name)) {
                     f->r = r;
                     break;
                 }
             }
-            ap_add_output_filter("H2_TRAILERS", task, r, r->connection);
+            ap_add_output_filter("H2_TRAILERS_OUT", task, r, r->connection);
             task->filters_set = 1;
         }
     }
@@ -724,6 +724,11 @@ static int h2_h2_late_fixups(request_rec
             /* check if we copy vs. setaside files in this location */
             task->output.copy_files = h2_config_geti(h2_config_rget(r), 
                                                      H2_CONF_COPY_FILES);
+            if (task->output.copy_files) {
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+                              "h2_slave_out(%s): copy_files on", task->id);
+                h2_beam_on_file_beam(task->output.beam, h2_beam_no_files, NULL);
+            }
         }
     }
     return DECLINED;

Modified: httpd/httpd/branches/2.4.x-openssl-1.1.0-compat/modules/http2/h2_h2.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x-openssl-1.1.0-compat/modules/http2/h2_h2.h?rev=1769588&r1=1769587&r2=1769588&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x-openssl-1.1.0-compat/modules/http2/h2_h2.h (original)
+++ httpd/httpd/branches/2.4.x-openssl-1.1.0-compat/modules/http2/h2_h2.h Mon Nov 14 10:26:31 2016
@@ -17,7 +17,7 @@
 #define __mod_h2__h2_h2__
 
 /**
- * List of ALPN protocol identifiers that we suport in cleartext
+ * List of ALPN protocol identifiers that we support in cleartext
  * negotiations. NULL terminated.
  */
 extern const char *h2_clear_protos[];
@@ -36,7 +36,7 @@ extern const char *h2_tls_protos[];
 const char *h2_h2_err_description(unsigned int h2_error);
 
 /*
- * One time, post config intialization.
+ * One time, post config initialization.
  */
 apr_status_t h2_h2_init(apr_pool_t *pool, server_rec *s);
 

Modified: httpd/httpd/branches/2.4.x-openssl-1.1.0-compat/modules/http2/h2_mplx.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x-openssl-1.1.0-compat/modules/http2/h2_mplx.c?rev=1769588&r1=1769587&r2=1769588&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x-openssl-1.1.0-compat/modules/http2/h2_mplx.c (original)
+++ httpd/httpd/branches/2.4.x-openssl-1.1.0-compat/modules/http2/h2_mplx.c Mon Nov 14 10:26:31 2016
@@ -28,13 +28,13 @@
 
 #include "mod_http2.h"
 
+#include "h2.h"
 #include "h2_private.h"
 #include "h2_bucket_beam.h"
 #include "h2_config.h"
 #include "h2_conn.h"
 #include "h2_ctx.h"
 #include "h2_h2.h"
-#include "h2_response.h"
 #include "h2_mplx.h"
 #include "h2_ngn_shed.h"
 #include "h2_request.h"
@@ -53,10 +53,10 @@ static void h2_beam_log(h2_bucket_beam *
         apr_size_t off = 0;
         
         off += apr_snprintf(buffer+off, H2_ALEN(buffer)-off, "cl=%d, ", beam->closed);
-        off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "red", ", ", &beam->red);
-        off += h2_util_bb_print(buffer+off, H2_ALEN(buffer)-off, "green", ", ", beam->green);
-        off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "hold", ", ", &beam->hold);
-        off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "purge", "", &beam->purge);
+        off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "red", ", ", &beam->send_list);
+        off += h2_util_bb_print(buffer+off, H2_ALEN(buffer)-off, "green", ", ", beam->recv_buffer);
+        off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "hold", ", ", &beam->hold_list);
+        off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "purge", "", &beam->purge_list);
 
         ap_log_cerror(APLOG_MARK, level, 0, c, "beam(%ld-%d): %s %s", 
                       c->id, id, msg, buffer);
@@ -90,14 +90,14 @@ static apr_status_t enter_mutex(h2_mplx
      * This allow recursive entering of the mutex from the saem thread,
      * which is what we need in certain situations involving callbacks
      */
-    AP_DEBUG_ASSERT(m);
+    ap_assert(m);
     apr_threadkey_private_get(&mutex, thread_lock);
     if (mutex == m->lock) {
         *pacquired = 0;
         return APR_SUCCESS;
     }
 
-    AP_DEBUG_ASSERT(m->lock);
+    ap_assert(m->lock);
     status = apr_thread_mutex_lock(m->lock);
     *pacquired = (status == APR_SUCCESS);
     if (*pacquired) {
@@ -168,7 +168,7 @@ static int can_beam_file(void *ctx, h2_b
     return 0;
 }
 
-static void have_out_data_for(h2_mplx *m, int stream_id);
+static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response);
 static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master);
 
 static void check_tx_reservation(h2_mplx *m) 
@@ -196,12 +196,21 @@ static int purge_stream(void *ctx, void
 {
     h2_mplx *m = ctx;
     h2_stream *stream = val;
-    h2_task *task = h2_ihash_get(m->tasks, stream->id);
-    h2_ihash_remove(m->spurge, stream->id);
-    h2_stream_destroy(stream);
+    int stream_id = stream->id;
+    h2_task *task;
+
+    /* stream_cleanup clears all buffers and destroys any buckets
+     * that might hold references into task space. Needs to be done
+     * before task destruction, otherwise it will complain. */
+    h2_stream_cleanup(stream);
+    
+    task = h2_ihash_get(m->tasks, stream_id);    
     if (task) {
         task_destroy(m, task, 1);
     }
+    
+    h2_stream_destroy(stream);
+    h2_ihash_remove(m->spurge, stream_id);
     return 0;
 }
 
@@ -217,11 +226,17 @@ static void purge_streams(h2_mplx *m)
 
 static void h2_mplx_destroy(h2_mplx *m)
 {
-    AP_DEBUG_ASSERT(m);
+    conn_rec **pslave;
+    ap_assert(m);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                   "h2_mplx(%ld): destroy, tasks=%d", 
                   m->id, (int)h2_ihash_count(m->tasks));
     check_tx_free(m);
+    
+    while (m->spare_slaves->nelts > 0) {
+        pslave = (conn_rec **)apr_array_pop(m->spare_slaves);
+        h2_slave_destroy(*pslave);
+    }
     if (m->pool) {
         apr_pool_destroy(m->pool);
     }
@@ -246,7 +261,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
     apr_status_t status = APR_SUCCESS;
     apr_allocator_t *allocator = NULL;
     h2_mplx *m;
-    AP_DEBUG_ASSERT(conf);
+    ap_assert(conf);
     
     status = apr_allocator_create(&allocator);
     if (status != APR_SUCCESS) {
@@ -286,15 +301,14 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
         m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
         m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
         m->q = h2_iq_create(m->pool, m->max_streams);
-        m->sready = h2_ihash_create(m->pool, offsetof(h2_stream,id));
-        m->sresume = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+        m->readyq = h2_iq_create(m->pool, m->max_streams);
         m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
+        m->redo_tasks = h2_ihash_create(m->pool, offsetof(h2_task, stream_id));
 
         m->stream_timeout = stream_timeout;
         m->workers = workers;
         m->workers_max = workers->max_workers;
-        m->workers_def_limit = 4;
-        m->workers_limit = m->workers_def_limit;
+        m->workers_limit = 6; /* the original h1 max parallel connections */
         m->last_limit_change = m->last_idle_block = apr_time_now();
         m->limit_change_interval = apr_time_from_msec(200);
         
@@ -310,7 +324,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
     return m;
 }
 
-apr_uint32_t h2_mplx_shutdown(h2_mplx *m)
+int h2_mplx_shutdown(h2_mplx *m)
 {
     int acquired, max_stream_started = 0;
     
@@ -344,7 +358,6 @@ static void task_destroy(h2_mplx *m, h2_
 {
     conn_rec *slave = NULL;
     int reuse_slave = 0;
-    apr_status_t status;
     
     ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
                   "h2_task(%s): destroy", task->id);
@@ -356,29 +369,20 @@ static void task_destroy(h2_mplx *m, h2_
         }
     }
     
-    /* The pool is cleared/destroyed which also closes all
-     * allocated file handles. Give this count back to our
-     * file handle pool. */
-    if (task->output.beam) {
-        m->tx_handles_reserved += 
-        h2_beam_get_files_beamed(task->output.beam);
-        h2_beam_on_produced(task->output.beam, NULL, NULL);
-        status = h2_beam_shutdown(task->output.beam, APR_NONBLOCK_READ, 1);
-        if (status != APR_SUCCESS){
-            ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, m->c, 
-                          APLOGNO(03385) "h2_task(%s): output shutdown "
-                          "incomplete", task->id);
-        }
-    }
+    h2_beam_on_produced(task->output.beam, NULL, NULL);
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, 
+                  APLOGNO(03385) "h2_task(%s): destroy "
+                  "output beam empty=%d, holds proxies=%d", 
+                  task->id,
+                  h2_beam_empty(task->output.beam),
+                  h2_beam_holds_proxies(task->output.beam));
     
     slave = task->c;
     reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc)
                    && !task->rst_error);
     
     h2_ihash_remove(m->tasks, task->stream_id);
-    if (m->redo_tasks) {
-        h2_ihash_remove(m->redo_tasks, task->stream_id);
-    }
+    h2_ihash_remove(m->redo_tasks, task->stream_id);
     h2_task_destroy(task);
 
     if (slave) {
@@ -387,7 +391,7 @@ static void task_destroy(h2_mplx *m, h2_
         }
         else {
             slave->sbh = NULL;
-            h2_slave_destroy(slave, NULL);
+            h2_slave_destroy(slave);
         }
     }
     
@@ -430,18 +434,16 @@ static void stream_done(h2_mplx *m, h2_s
      * stream destruction until the task is done. 
      */
     h2_iq_remove(m->q, stream->id);
-    h2_ihash_remove(m->sready, stream->id);
-    h2_ihash_remove(m->sresume, stream->id);
     h2_ihash_remove(m->streams, stream->id);
-    if (stream->input) {
-        m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
-        h2_beam_on_consumed(stream->input, NULL, NULL);
-        /* Let anyone blocked reading know that there is no more to come */
-        h2_beam_abort(stream->input);
-        /* Remove mutex after, so that abort still finds cond to signal */
-        h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
-    }
+    
     h2_stream_cleanup(stream);
+    m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
+    h2_beam_on_consumed(stream->input, NULL, NULL);
+    /* Let anyone blocked reading know that there is no more to come */
+    h2_beam_abort(stream->input);
+    /* Remove mutex after, so that abort still finds cond to signal */
+    h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
+    m->tx_handles_reserved += h2_beam_get_files_beamed(stream->output);
 
     task = h2_ihash_get(m->tasks, stream->id);
     if (task) {
@@ -455,7 +457,7 @@ static void stream_done(h2_mplx *m, h2_s
         }
         else {
             /* already finished */
-            task_destroy(m, task, 0);
+            task_destroy(m, task, 1);
         }
     }
     h2_stream_destroy(stream);
@@ -467,30 +469,55 @@ static int stream_done_iter(void *ctx, v
     return 0;
 }
 
+typedef struct {
+    h2_mplx_stream_cb *cb;
+    void *ctx;
+} stream_iter_ctx_t;
+
+static int stream_iter_wrap(void *ctx, void *stream)
+{
+    stream_iter_ctx_t *x = ctx;
+    return x->cb(stream, x->ctx);
+}
+
+apr_status_t h2_mplx_stream_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx)
+{
+    apr_status_t status;
+    int acquired;
+    
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+        stream_iter_ctx_t x;
+        x.cb = cb;
+        x.ctx = ctx;
+        h2_ihash_iter(m->streams, stream_iter_wrap, &x);
+        
+        leave_mutex(m, acquired);
+    }
+    return status;
+}
+
 static int task_print(void *ctx, void *val)
 {
     h2_mplx *m = ctx;
     h2_task *task = val;
 
-    if (task && task->request) {
+    if (task) {
         h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
 
-        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
-                      "->03198: h2_stream(%s): %s %s %s -> %s %d"
-                      "[orph=%d/started=%d/done=%d]", 
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
+                      "->03198: h2_stream(%s): %s %s %s"
+                      "[orph=%d/started=%d/done=%d/frozen=%d]", 
                       task->id, task->request->method, 
                       task->request->authority, task->request->path,
-                      task->response? "http" : (task->rst_error? "reset" : "?"),
-                      task->response? task->response->http_status : task->rst_error,
                       (stream? 0 : 1), task->worker_started, 
-                      task->worker_done);
+                      task->worker_done, task->frozen);
     }
     else if (task) {
-        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
                       "->03198: h2_stream(%ld-%d): NULL", m->id, task->stream_id);
     }
     else {
-        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
                       "->03198: h2_stream(%ld-NULL): NULL", m->id);
     }
     return 1;
@@ -499,13 +526,11 @@ static int task_print(void *ctx, void *v
 static int task_abort_connection(void *ctx, void *val)
 {
     h2_task *task = val;
-    if (task->c) {
-        task->c->aborted = 1;
-    }
-    if (task->input.beam) {
+    if (!task->worker_done) { 
+        if (task->c) {
+            task->c->aborted = 1;
+        }
         h2_beam_abort(task->input.beam);
-    }
-    if (task->output.beam) {
         h2_beam_abort(task->output.beam);
     }
     return 1;
@@ -515,117 +540,97 @@ static int report_stream_iter(void *ctx,
     h2_mplx *m = ctx;
     h2_stream *stream = val;
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                  "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, "
-                  "submitted=%d, suspended=%d", 
+                  "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, ready=%d", 
                   m->id, stream->id, stream->started, stream->scheduled,
-                  stream->submitted, stream->suspended);
+                  h2_stream_is_ready(stream));
     return 1;
 }
 
+static int task_done_iter(void *ctx, void *val);
+
 apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
 {
     apr_status_t status;
     int acquired;
 
+    /* How to shut down a h2 connection:
+     * 1. tell the workers that no more tasks will come from us */
     h2_workers_unregister(m->workers, m);
     
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        int i, wait_secs = 5;
+        int i, wait_secs = 60;
 
-        if (!h2_ihash_empty(m->streams) && APLOGctrace1(m->c)) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                          "h2_mplx(%ld): release_join with %d streams open, "
-                          "%d streams resume, %d streams ready, %d tasks", 
-                          m->id, (int)h2_ihash_count(m->streams),
-                          (int)h2_ihash_count(m->sresume), 
-                          (int)h2_ihash_count(m->sready), 
-                          (int)h2_ihash_count(m->tasks));
-            h2_ihash_iter(m->streams, report_stream_iter, m);
-        }
-        
-        /* disable WINDOW_UPDATE callbacks */
+        /* 2. disable WINDOW_UPDATEs and set the mplx to aborted, clear
+         *    our TODO list and purge any streams we have collected */
         h2_mplx_set_consumed_cb(m, NULL, NULL);
-        
-        if (!h2_ihash_empty(m->shold)) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                          "h2_mplx(%ld): start release_join with %d streams in hold", 
-                          m->id, (int)h2_ihash_count(m->shold));
-        }
-        if (!h2_ihash_empty(m->spurge)) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                          "h2_mplx(%ld): start release_join with %d streams to purge", 
-                          m->id, (int)h2_ihash_count(m->spurge));
-        }
-        
+        h2_mplx_abort(m);
         h2_iq_clear(m->q);
+        purge_streams(m);
+
+        /* 3. wakeup all sleeping tasks. Mark all still active streams as 'done'. 
+         *    m->streams has to be empty afterwards with streams either in
+         *    a) m->shold because a task is still active
+         *    b) m->spurge because task is done, or was not started */
+        h2_ihash_iter(m->tasks, task_abort_connection, m);
         apr_thread_cond_broadcast(m->task_thawed);
         while (!h2_ihash_iter(m->streams, stream_done_iter, m)) {
             /* iterate until all streams have been removed */
         }
-        AP_DEBUG_ASSERT(h2_ihash_empty(m->streams));
-    
-        if (!h2_ihash_empty(m->shold)) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                          "h2_mplx(%ld): 2. release_join with %d streams in hold", 
-                          m->id, (int)h2_ihash_count(m->shold));
-        }
-        if (!h2_ihash_empty(m->spurge)) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                          "h2_mplx(%ld): 2. release_join with %d streams to purge", 
-                          m->id, (int)h2_ihash_count(m->spurge));
-        }
+        ap_assert(h2_ihash_empty(m->streams));
+
+        /* 4. purge all streams we collected by marking them 'done' */
+        purge_streams(m);
         
-        /* If we still have busy workers, we cannot release our memory
-         * pool yet, as tasks have references to us.
-         * Any operation on the task slave connection will from now on
-         * be errored ECONNRESET/ABORTED, so processing them should fail 
-         * and workers *should* return in a timely fashion.
-         */
+        /* 5. while workers are busy on this connection, meaning they
+         *    are processing tasks from this connection, wait on them finishing
+         *    to wake us and check again. Eventually, this has to succeed. */    
+        m->join_wait = wait;
         for (i = 0; m->workers_busy > 0; ++i) {
-            h2_ihash_iter(m->tasks, task_abort_connection, m);
-            
-            m->join_wait = wait;
             status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
             
             if (APR_STATUS_IS_TIMEUP(status)) {
-                if (i > 0) {
-                    /* Oh, oh. Still we wait for assigned  workers to report that 
-                     * they are done. Unless we have a bug, a worker seems to be hanging. 
-                     * If we exit now, all will be deallocated and the worker, once 
-                     * it does return, will walk all over freed memory...
-                     */
-                    ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03198)
-                                  "h2_mplx(%ld): release, waiting for %d seconds now for "
-                                  "%d h2_workers to return, have still %d tasks outstanding", 
-                                  m->id, i*wait_secs, m->workers_busy,
-                                  (int)h2_ihash_count(m->tasks));
-                    if (i == 1) {
-                        h2_ihash_iter(m->tasks, task_print, m);
-                    }
-                }
-                h2_mplx_abort(m);
-                apr_thread_cond_broadcast(m->task_thawed);
+                /* This can happen if we have very long running requests
+                 * that do not time out on IO. */
+                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03198)
+                              "h2_mplx(%ld): release, waiting for %d seconds now for "
+                              "%d h2_workers to return, have still %d tasks outstanding", 
+                              m->id, i*wait_secs, m->workers_busy,
+                              (int)h2_ihash_count(m->tasks));
+                h2_ihash_iter(m->shold, report_stream_iter, m);
+                h2_ihash_iter(m->tasks, task_print, m);
             }
-        }
-        
-        AP_DEBUG_ASSERT(h2_ihash_empty(m->shold));
-        if (!h2_ihash_empty(m->spurge)) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                          "h2_mplx(%ld): 3. release_join %d streams to purge", 
-                          m->id, (int)h2_ihash_count(m->spurge));
             purge_streams(m);
         }
-        AP_DEBUG_ASSERT(h2_ihash_empty(m->spurge));
+        m->join_wait = NULL;
         
+        /* 6. All workers for this connection are done, we are in 
+         * single-threaded processing now effectively. */
+        leave_mutex(m, acquired);
+
         if (!h2_ihash_empty(m->tasks)) {
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
-                          "h2_mplx(%ld): release_join -> destroy, "
-                          "%d tasks still present", 
+            /* when we are here, we lost track of the tasks still present.
+             * this currently happens with mod_proxy_http2 when we shut
+             * down a h2_req_engine with tasks assigned. Since no parallel
+             * processing is going on any more, we just clean them up. */ 
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,  APLOGNO(03056)
+                          "h2_mplx(%ld): 3. release_join with %d tasks",
                           m->id, (int)h2_ihash_count(m->tasks));
+            h2_ihash_iter(m->tasks, task_print, m);
+            
+            while (!h2_ihash_iter(m->tasks, task_done_iter, m)) {
+                /* iterate until all tasks have been removed */
+            }
         }
-        leave_mutex(m, acquired);
+
+        /* 7. With all tasks done, the stream hold should be empty and all
+         *    remaining streams are ready for purging */
+        ap_assert(h2_ihash_empty(m->shold));
+        purge_streams(m);
+        
+        /* 8. close the h2_req_enginge shed and self destruct */
+        h2_ngn_shed_destroy(m->ngn_shed);
+        m->ngn_shed = NULL;
         h2_mplx_destroy(m);
-        /* all gone */
     }
     return status;
 }
@@ -634,7 +639,6 @@ void h2_mplx_abort(h2_mplx *m)
 {
     int acquired;
     
-    AP_DEBUG_ASSERT(m);
     if (!m->aborted && enter_mutex(m, &acquired) == APR_SUCCESS) {
         m->aborted = 1;
         h2_ngn_shed_abort(m->ngn_shed);
@@ -647,7 +651,6 @@ apr_status_t h2_mplx_stream_done(h2_mplx
     apr_status_t status = APR_SUCCESS;
     int acquired;
     
-    AP_DEBUG_ASSERT(m);
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
                       "h2_mplx(%ld-%d): marking stream as done.", 
@@ -659,61 +662,88 @@ apr_status_t h2_mplx_stream_done(h2_mplx
     return status;
 }
 
+h2_stream *h2_mplx_stream_get(h2_mplx *m, int id)
+{
+    h2_stream *s = NULL;
+    int acquired;
+    
+    if ((enter_mutex(m, &acquired)) == APR_SUCCESS) {
+        s = h2_ihash_get(m->streams, id);
+        leave_mutex(m, acquired);
+    }
+    return s;
+}
+
 void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
 {
     m->input_consumed = cb;
     m->input_consumed_ctx = ctx;
 }
 
-static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
+static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
+{
+    h2_mplx *m = ctx;
+    apr_status_t status;
+    h2_stream *stream;
+    int acquired;
+    
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+        stream = h2_ihash_get(m->streams, beam->id);
+        if (stream) {
+            have_out_data_for(m, stream, 0);
+        }
+        leave_mutex(m, acquired);
+    }
+}
+
+static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
 {
     apr_status_t status = APR_SUCCESS;
     h2_task *task = h2_ihash_get(m->tasks, stream_id);
     h2_stream *stream = h2_ihash_get(m->streams, stream_id);
+    apr_size_t beamed_count;
     
     if (!task || !stream) {
         return APR_ECONNABORTED;
     }
     
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                  "h2_mplx(%s): open response: %d, rst=%d",
-                  task->id, response->http_status, response->rst_error);
-    
-    h2_task_set_response(task, response);
-    
-    if (task->output.beam) {
-        h2_beam_buffer_size_set(task->output.beam, m->stream_max_mem);
-        h2_beam_timeout_set(task->output.beam, m->stream_timeout);
-        h2_beam_on_consumed(task->output.beam, stream_output_consumed, task);
-        m->tx_handles_reserved -= h2_beam_get_files_beamed(task->output.beam);
-        if (!task->output.copy_files) {
-            h2_beam_on_file_beam(task->output.beam, can_beam_file, m);
-        }
-        h2_beam_mutex_set(task->output.beam, beam_enter, task->cond, m);
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+                  "h2_mplx(%s): out open", task->id);
+                      
+    h2_beam_on_consumed(stream->output, stream_output_consumed, task);
+    h2_beam_on_produced(stream->output, output_produced, m);
+    beamed_count = h2_beam_get_files_beamed(stream->output);
+    if (m->tx_handles_reserved >= beamed_count) {
+        m->tx_handles_reserved -= beamed_count;
     }
-    
-    h2_ihash_add(m->sready, stream);
-    if (response && response->http_status < 300) {
-        /* we might see some file buckets in the output, see
-         * if we have enough handles reserved. */
-        check_tx_reservation(m);
+    else {
+        m->tx_handles_reserved = 0;
+    }
+    if (!task->output.copy_files) {
+        h2_beam_on_file_beam(stream->output, can_beam_file, m);
     }
-    have_out_data_for(m, stream_id);
+    
+    /* time to protect the beam against multi-threaded use */
+    h2_beam_mutex_set(stream->output, beam_enter, task->cond, m);
+    
+    /* we might see some file buckets in the output, see
+     * if we have enough handles reserved. */
+    check_tx_reservation(m);
+    have_out_data_for(m, stream, 0);
     return status;
 }
 
-apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response)
+apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
 {
     apr_status_t status;
     int acquired;
     
-    AP_DEBUG_ASSERT(m);
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         if (m->aborted) {
             status = APR_ECONNABORTED;
         }
         else {
-            status = out_open(m, stream_id, response);
+            status = out_open(m, stream_id, beam);
         }
         leave_mutex(m, acquired);
     }
@@ -734,25 +764,13 @@ static apr_status_t out_close(h2_mplx *m
         return APR_ECONNABORTED;
     }
 
-    if (!task->response && !task->rst_error) {
-        /* In case a close comes before a response was created,
-         * insert an error one so that our streams can properly reset.
-         */
-        h2_response *r = h2_response_die(task->stream_id, 500, 
-                                         task->request, m->pool);
-        status = out_open(m, task->stream_id, r);
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c, APLOGNO(03393)
-                      "h2_mplx(%s): close, no response, no rst", task->id);
-    }
     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
                   "h2_mplx(%s): close", task->id);
-    if (task->output.beam) {
-        status = h2_beam_close(task->output.beam);
-        h2_beam_log(task->output.beam, task->stream_id, "out_close", m->c, 
-                    APLOG_TRACE2);
-    }
+    status = h2_beam_close(task->output.beam);
+    h2_beam_log(task->output.beam, task->stream_id, "out_close", m->c, 
+                APLOG_TRACE2);
     output_consumed_signal(m, task);
-    have_out_data_for(m, task->stream_id);
+    have_out_data_for(m, stream, 0);
     return status;
 }
 
@@ -762,12 +780,11 @@ apr_status_t h2_mplx_out_trywait(h2_mplx
     apr_status_t status;
     int acquired;
     
-    AP_DEBUG_ASSERT(m);
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         if (m->aborted) {
             status = APR_ECONNABORTED;
         }
-        else if (!h2_ihash_empty(m->sready) || !h2_ihash_empty(m->sresume)) {
+        else if (!h2_iq_empty(m->readyq)) {
             status = APR_SUCCESS;
         }
         else {
@@ -786,10 +803,11 @@ apr_status_t h2_mplx_out_trywait(h2_mplx
     return status;
 }
 
-static void have_out_data_for(h2_mplx *m, int stream_id)
+static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response)
 {
-    (void)stream_id;
-    AP_DEBUG_ASSERT(m);
+    ap_assert(m);
+    ap_assert(stream);
+    h2_iq_append(m->readyq, stream->id);
     if (m->added_output) {
         apr_thread_cond_signal(m->added_output);
     }
@@ -800,7 +818,6 @@ apr_status_t h2_mplx_reprioritize(h2_mpl
     apr_status_t status;
     int acquired;
     
-    AP_DEBUG_ASSERT(m);
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         if (m->aborted) {
             status = APR_ECONNABORTED;
@@ -822,32 +839,26 @@ apr_status_t h2_mplx_process(h2_mplx *m,
     int do_registration = 0;
     int acquired;
     
-    AP_DEBUG_ASSERT(m);
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         if (m->aborted) {
             status = APR_ECONNABORTED;
         }
         else {
             h2_ihash_add(m->streams, stream);
-            if (stream->response) {
-                /* already have a respone, schedule for submit */
-                h2_ihash_add(m->sready, stream);
+            if (h2_stream_is_ready(stream)) {
+                h2_iq_append(m->readyq, stream->id);
             }
             else {
-                h2_beam_create(&stream->input, stream->pool, stream->id, 
-                               "input", 0);
                 if (!m->need_registration) {
                     m->need_registration = h2_iq_empty(m->q);
                 }
                 if (m->workers_busy < m->workers_max) {
                     do_registration = m->need_registration;
                 }
-                h2_iq_add(m->q, stream->id, cmp, ctx);
-                
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
-                              "h2_mplx(%ld-%d): process, body=%d", 
-                              m->c->id, stream->id, stream->request->body);
+                h2_iq_add(m->q, stream->id, cmp, ctx);                
             }
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+                          "h2_mplx(%ld-%d): process", m->c->id, stream->id);
         }
         leave_mutex(m, acquired);
     }
@@ -858,7 +869,7 @@ apr_status_t h2_mplx_process(h2_mplx *m,
     return status;
 }
 
-static h2_task *pop_task(h2_mplx *m)
+static h2_task *next_stream_task(h2_mplx *m)
 {
     h2_task *task = NULL;
     h2_stream *stream;
@@ -876,13 +887,14 @@ static h2_task *pop_task(h2_mplx *m)
                 slave = *pslave;
             }
             else {
-                slave = h2_slave_create(m->c, m->pool, NULL);
+                slave = h2_slave_create(m->c, stream->id, m->pool);
                 new_conn = 1;
             }
             
             slave->sbh = m->c->sbh;
             slave->aborted = 0;
-            task = h2_task_create(slave, stream->request, stream->input, m);
+            task = h2_task_create(slave, stream->id, stream->request, 
+                                  stream->input, stream->output, m);
             h2_ihash_add(m->tasks, task);
             
             m->c->keepalives++;
@@ -897,13 +909,13 @@ static h2_task *pop_task(h2_mplx *m)
                 m->max_stream_started = sid;
             }
 
-            if (stream->input) {
-                h2_beam_timeout_set(stream->input, m->stream_timeout);
-                h2_beam_on_consumed(stream->input, stream_input_consumed, m);
-                h2_beam_on_file_beam(stream->input, can_beam_file, m);
-                h2_beam_mutex_set(stream->input, beam_enter, task->cond, m);
-            }
-
+            h2_beam_timeout_set(stream->input, m->stream_timeout);
+            h2_beam_on_consumed(stream->input, stream_input_consumed, m);
+            h2_beam_on_file_beam(stream->input, can_beam_file, m);
+            h2_beam_mutex_set(stream->input, beam_enter, task->cond, m);
+            
+            h2_beam_buffer_size_set(stream->output, m->stream_max_mem);
+            h2_beam_timeout_set(stream->output, m->stream_timeout);
             ++m->workers_busy;
         }
     }
@@ -916,13 +928,12 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, in
     apr_status_t status;
     int acquired;
     
-    AP_DEBUG_ASSERT(m);
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         if (m->aborted) {
             *has_more = 0;
         }
         else {
-            task = pop_task(m);
+            task = next_stream_task(m);
             *has_more = !h2_iq_empty(m->q);
         }
         
@@ -941,9 +952,6 @@ static void task_done(h2_mplx *m, h2_tas
          * and the original worker has finished. That means the 
          * engine may start processing now. */
         h2_task_thaw(task);
-        /* we do not want the task to block on writing response
-         * bodies into the mplx. */
-        h2_task_set_io_blocking(task, 0);
         apr_thread_cond_broadcast(m->task_thawed);
         return;
     }
@@ -953,14 +961,11 @@ static void task_done(h2_mplx *m, h2_tas
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                       "h2_mplx(%ld): task(%s) done", m->id, task->id);
         out_close(m, task);
-        stream = h2_ihash_get(m->streams, task->stream_id);
         
         if (ngn) {
             apr_off_t bytes = 0;
-            if (task->output.beam) {
-                h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
-                bytes += h2_beam_get_buffered(task->output.beam);
-            }
+            h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
+            bytes += h2_beam_get_buffered(task->output.beam);
             if (bytes > 0) {
                 /* we need to report consumed and current buffered output
                  * to the engine. The request will be streamed out or cancelled,
@@ -971,7 +976,8 @@ static void task_done(h2_mplx *m, h2_tas
         }
         
         if (task->engine) {
-            if (!h2_req_engine_is_shutdown(task->engine)) {
+            if (!m->aborted && !task->c->aborted 
+                && !h2_req_engine_is_shutdown(task->engine)) {
                 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
                               "h2_mplx(%ld): task(%s) has not-shutdown "
                               "engine(%s)", m->id, task->id, 
@@ -980,7 +986,8 @@ static void task_done(h2_mplx *m, h2_tas
             h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
         }
         
-        if (!m->aborted && stream && m->redo_tasks
+        stream = h2_ihash_get(m->streams, task->stream_id);
+        if (!m->aborted && stream 
             && h2_ihash_get(m->redo_tasks, task->stream_id)) {
             /* reset and schedule again */
             h2_task_redo(task);
@@ -991,10 +998,6 @@ static void task_done(h2_mplx *m, h2_tas
         
         task->worker_done = 1;
         task->done_at = apr_time_now();
-        if (task->output.beam) {
-            h2_beam_on_consumed(task->output.beam, NULL, NULL);
-            h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
-        }
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
                       "h2_mplx(%s): request done, %f ms elapsed", task->id, 
                       (task->done_at - task->started_at) / 1000.0);
@@ -1020,23 +1023,24 @@ static void task_done(h2_mplx *m, h2_tas
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
                           "h2_mplx(%s): task_done, stream still open", 
                           task->id);
-            if (h2_stream_is_suspended(stream)) {
-                /* more data will not arrive, resume the stream */
-                h2_ihash_add(m->sresume, stream);
-                have_out_data_for(m, stream->id);
-            }
+            /* more data will not arrive, resume the stream */
+            have_out_data_for(m, stream, 0);
+            h2_beam_on_consumed(stream->output, NULL, NULL);
+            h2_beam_mutex_set(stream->output, NULL, NULL, NULL);
         }
         else {
             /* stream no longer active, was it placed in hold? */
             stream = h2_ihash_get(m->shold, task->stream_id);
             if (stream) {
                 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                              "h2_mplx(%s): task_done, stream in hold", 
-                              task->id);
+                              "h2_mplx(%s): task_done, stream %d in hold", 
+                              task->id, stream->id);
                 /* We cannot destroy the stream here since this is 
                  * called from a worker thread and freeing memory pools
                  * is only safe in the only thread using it (and its
                  * parent pool / allocator) */
+                h2_beam_on_consumed(stream->output, NULL, NULL);
+                h2_beam_mutex_set(stream->output, NULL, NULL, NULL);
                 h2_ihash_remove(m->shold, stream->id);
                 h2_ihash_add(m->spurge, stream);
             }
@@ -1046,14 +1050,16 @@ static void task_done(h2_mplx *m, h2_tas
                               task->id);
                 task_destroy(m, task, 0);
             }
-            
-            if (m->join_wait) {
-                apr_thread_cond_signal(m->join_wait);
-            }
         }
     }
 }
 
+static int task_done_iter(void *ctx, void *val)
+{
+    task_done((h2_mplx*)ctx, val, 0);
+    return 0;
+}
+
 void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
 {
     int acquired;
@@ -1061,9 +1067,12 @@ void h2_mplx_task_done(h2_mplx *m, h2_ta
     if (enter_mutex(m, &acquired) == APR_SUCCESS) {
         task_done(m, task, NULL);
         --m->workers_busy;
+        if (m->join_wait) {
+            apr_thread_cond_signal(m->join_wait);
+        }
         if (ptask) {
             /* caller wants another task */
-            *ptask = pop_task(m);
+            *ptask = next_stream_task(m);
         }
         leave_mutex(m, acquired);
     }
@@ -1076,15 +1085,19 @@ void h2_mplx_task_done(h2_mplx *m, h2_ta
 static int latest_repeatable_unsubmitted_iter(void *data, void *val)
 {
     task_iter_ctx *ctx = data;
+    h2_stream *stream;
     h2_task *task = val;
     if (!task->worker_done && h2_task_can_redo(task) 
         && !h2_ihash_get(ctx->m->redo_tasks, task->stream_id)) {
-        /* this task occupies a worker, the response has not been submitted yet,
-         * not been cancelled and it is a repeatable request
-         * -> it can be re-scheduled later */
-        if (!ctx->task || ctx->task->started_at < task->started_at) {
-            /* we did not have one or this one was started later */
-            ctx->task = task;
+        stream = h2_ihash_get(ctx->m->streams, task->stream_id);
+        if (stream && !h2_stream_is_ready(stream)) {
+            /* this task occupies a worker, the response has not been submitted 
+             * yet, not been cancelled and it is a repeatable request
+             * -> it can be re-scheduled later */
+            if (!ctx->task || ctx->task->started_at < task->started_at) {
+                /* we did not have one or this one was started later */
+                ctx->task = task;
+            }
         }
     }
     return 1;
@@ -1127,13 +1140,10 @@ static apr_status_t unschedule_slow_task
     h2_task *task;
     int n;
     
-    if (!m->redo_tasks) {
-        m->redo_tasks = h2_ihash_create(m->pool, offsetof(h2_task, stream_id));
-    }
     /* Try to get rid of streams that occupy workers. Look for safe requests
      * that are repeatable. If none found, fail the connection.
      */
-    n = (m->workers_busy - m->workers_limit - h2_ihash_count(m->redo_tasks));
+    n = (m->workers_busy - m->workers_limit - (int)h2_ihash_count(m->redo_tasks));
     while (n > 0 && (task = get_latest_repeatable_unsubmitted_task(m))) {
         h2_task_rst(task, H2_ERR_CANCEL);
         h2_ihash_add(m->redo_tasks, task);
@@ -1251,13 +1261,12 @@ apr_status_t h2_mplx_req_engine_push(con
         return APR_ECONNABORTED;
     }
     m = task->mplx;
-    task->r = r;
     
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
         
         if (stream) {
-            status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit);
+            status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit);
         }
         else {
             status = APR_ECONNABORTED;
@@ -1269,13 +1278,12 @@ apr_status_t h2_mplx_req_engine_push(con
 
 apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn, 
                                      apr_read_type_e block, 
-                                     apr_uint32_t capacity, 
+                                     int capacity, 
                                      request_rec **pr)
 {   
     h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
     h2_mplx *m = h2_ngn_shed_get_ctx(shed);
     apr_status_t status;
-    h2_task *task = NULL;
     int acquired;
     
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
@@ -1290,26 +1298,26 @@ apr_status_t h2_mplx_req_engine_pull(h2_
              * had and, if not, wait a short while before doing the
              * blocking, and if unsuccessful, terminating read.
              */
-            status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task);
+            status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
             if (APR_STATUS_IS_EAGAIN(status)) {
                 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                               "h2_mplx(%ld): start block engine pull", m->id);
                 apr_thread_cond_timedwait(m->task_thawed, m->lock, 
                                           apr_time_from_msec(20));
-                status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task);
+                status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
             }
         }
         else {
-            status = h2_ngn_shed_pull_task(shed, ngn, capacity,
-                                           want_shutdown, &task);
+            status = h2_ngn_shed_pull_request(shed, ngn, capacity,
+                                              want_shutdown, pr);
         }
         leave_mutex(m, acquired);
     }
-    *pr = task? task->r : NULL;
     return status;
 }
  
-void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn)
+void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn,
+                             apr_status_t status)
 {
     h2_task *task = h2_ctx_cget_task(r_conn);
     
@@ -1320,6 +1328,10 @@ void h2_mplx_req_engine_done(h2_req_engi
         if (enter_mutex(m, &acquired) == APR_SUCCESS) {
             ngn_out_update_windows(m, ngn);
             h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
+            if (status != APR_SUCCESS && h2_task_can_redo(task) 
+                && !h2_ihash_get(m->redo_tasks, task->stream_id)) {
+                h2_ihash_add(m->redo_tasks, task);
+            }
             if (task->engine) { 
                 /* cannot report that as done until engine returns */
             }
@@ -1345,67 +1357,30 @@ static int update_window(void *ctx, void
 
 apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, 
                                             stream_ev_callback *on_resume, 
-                                            stream_ev_callback *on_response, 
                                             void *on_ctx)
 {
     apr_status_t status;
     int acquired;
-    int streams[32];
+    int ids[100];
     h2_stream *stream;
-    h2_task *task;
     size_t i, n;
     
-    AP_DEBUG_ASSERT(m);
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
                       "h2_mplx(%ld): dispatch events", m->id);
                       
         /* update input windows for streams */
         h2_ihash_iter(m->streams, update_window, m);
-
-        if (on_response && !h2_ihash_empty(m->sready)) {
-            n = h2_ihash_ishift(m->sready, streams, H2_ALEN(streams));
+        if (on_resume && !h2_iq_empty(m->readyq)) {
+            n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids));
             for (i = 0; i < n; ++i) {
-                stream = h2_ihash_get(m->streams, streams[i]);
-                if (!stream) {
-                    continue;
+                stream = h2_ihash_get(m->streams, ids[i]);
+                if (stream) {
+                    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
+                                  "h2_mplx(%ld-%d): on_resume", 
+                                  m->id, stream->id);
+                    on_resume(on_ctx, stream);
                 }
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
-                              "h2_mplx(%ld-%d): on_response", 
-                              m->id, stream->id);
-                task = h2_ihash_get(m->tasks, stream->id);
-                if (task) {
-                    task->submitted = 1;
-                    if (task->rst_error) {
-                        h2_stream_rst(stream, task->rst_error);
-                    }
-                    else {
-                        AP_DEBUG_ASSERT(task->response);
-                        h2_stream_set_response(stream, task->response, task->output.beam);
-                    }
-                }
-                else {
-                    /* We have the stream ready without a task. This happens
-                     * when we fail streams early. A response should already
-                     * be present.  */
-                    AP_DEBUG_ASSERT(stream->response || stream->rst_error);
-                }
-                status = on_response(on_ctx, stream->id);
-            }
-        }
-
-        if (on_resume && !h2_ihash_empty(m->sresume)) {
-            n = h2_ihash_ishift(m->sresume, streams, H2_ALEN(streams));
-            for (i = 0; i < n; ++i) {
-                stream = h2_ihash_get(m->streams, streams[i]);
-                if (!stream) {
-                    continue;
-                }
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
-                              "h2_mplx(%ld-%d): on_resume", 
-                              m->id, stream->id);
-                h2_stream_set_suspended(stream, 0);
-                status = on_resume(on_ctx, stream->id);
             }
         }
         
@@ -1414,47 +1389,34 @@ apr_status_t h2_mplx_dispatch_master_eve
     return status;
 }
 
-static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
+apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id)
 {
-    h2_mplx *m = ctx;
     apr_status_t status;
-    h2_stream *stream;
     int acquired;
     
-    AP_DEBUG_ASSERT(m);
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        stream = h2_ihash_get(m->streams, beam->id);
-        if (stream && h2_stream_is_suspended(stream)) {
-            h2_ihash_add(m->sresume, stream);
-            h2_beam_on_produced(beam, NULL, NULL);
-            have_out_data_for(m, beam->id);
+        h2_stream *s = h2_ihash_get(m->streams, stream_id);
+        if (s) {
+            h2_iq_append(m->readyq, stream_id);
         }
         leave_mutex(m, acquired);
     }
+    return status;
 }
 
-apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id)
+int h2_mplx_awaits_data(h2_mplx *m)
 {
     apr_status_t status;
-    h2_stream *stream;
-    h2_task *task;
-    int acquired;
-    
-    AP_DEBUG_ASSERT(m);
+    int acquired, waiting = 1;
+     
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        stream = h2_ihash_get(m->streams, stream_id);
-        if (stream) {
-            h2_stream_set_suspended(stream, 1);
-            task = h2_ihash_get(m->tasks, stream->id);
-            if (stream->started && (!task || task->worker_done)) {
-                h2_ihash_add(m->sresume, stream);
-            }
-            else {
-                /* register callback so that we can resume on new output */
-                h2_beam_on_produced(task->output.beam, output_produced, m);
-            }
+        if (h2_ihash_empty(m->streams)) {
+            waiting = 0;
+        }
+        if (h2_iq_empty(m->q) && h2_ihash_empty(m->tasks)) {
+            waiting = 0;
         }
         leave_mutex(m, acquired);
     }
-    return status;
+    return waiting;
 }



Mime
View raw message