httpd-cvs mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ic...@apache.org
Subject svn commit: r1725301 [2/6] - in /httpd/httpd/branches/2.4.x: ./ docs/manual/mod/ modules/http2/
Date Mon, 18 Jan 2016 16:22:58 GMT
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_h2.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_h2.c?rev=1725301&r1=1725300&r2=1725301&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_h2.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_h2.c Mon Jan 18 16:22:57 2016
@@ -24,8 +24,10 @@
 #include <http_config.h>
 #include <http_connection.h>
 #include <http_protocol.h>
+#include <http_request.h>
 #include <http_log.h>
 
+#include "mod_http2.h"
 #include "h2_private.h"
 
 #include "h2_stream.h"
@@ -33,7 +35,11 @@
 #include "h2_config.h"
 #include "h2_ctx.h"
 #include "h2_conn.h"
+#include "h2_request.h"
+#include "h2_session.h"
+#include "h2_util.h"
 #include "h2_h2.h"
+#include "mod_http2.h"
 
 const char *h2_tls_protos[] = {
     "h2", NULL
@@ -432,21 +438,18 @@ static int cipher_is_blacklisted(const c
 
 /*******************************************************************************
  * Hooks for processing incoming connections:
- * - pre_conn_before_tls switches SSL off for stream connections
  * - process_conn take over connection in case of h2
  */
 static int h2_h2_process_conn(conn_rec* c);
-static int h2_h2_remove_timeout(conn_rec* c);
 static int h2_h2_post_read_req(request_rec *r);
 
-
 /*******************************************************************************
  * Once per lifetime init, retrieve optional functions
  */
 apr_status_t h2_h2_init(apr_pool_t *pool, server_rec *s)
 {
     (void)pool;
-    ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, s, "h2_h2, child_init");
+    ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, s, "h2_h2, child_init");
     opt_ssl_engine_disable = APR_RETRIEVE_OPTIONAL_FN(ssl_engine_disable);
     opt_ssl_is_https = APR_RETRIEVE_OPTIONAL_FN(ssl_is_https);
     opt_ssl_var_lookup = APR_RETRIEVE_OPTIONAL_FN(ssl_var_lookup);
@@ -527,19 +530,15 @@ int h2_is_acceptable_connection(conn_rec
 int h2_allows_h2_direct(conn_rec *c)
 {
     const h2_config *cfg = h2_config_get(c);
+    int is_tls = h2_h2_is_tls(c);
+    const char *needed_protocol = is_tls? "h2" : "h2c";
     int h2_direct = h2_config_geti(cfg, H2_CONF_DIRECT);
     
     if (h2_direct < 0) {
-        if (h2_h2_is_tls(c)) {
-            /* disabled by default on TLS */
-            h2_direct = 0;
-        }
-        else {
-            /* enabled if "Protocols h2c" is configured */
-            h2_direct = ap_is_allowed_protocol(c, NULL, NULL, "h2c");
-        }
+        h2_direct = is_tls? 0 : 1;
     }
-    return !!h2_direct;
+    return (h2_direct 
+            && ap_is_allowed_protocol(c, NULL, NULL, needed_protocol));
 }
 
 int h2_allows_h2_upgrade(conn_rec *c)
@@ -553,22 +552,20 @@ int h2_allows_h2_upgrade(conn_rec *c)
 /*******************************************************************************
  * Register various hooks
  */
-static const char *const mod_reqtimeout[] = { "reqtimeout.c", NULL};
-static const char* const mod_ssl[]        = {"mod_ssl.c", NULL};
+static const char* const mod_ssl[]        = { "mod_ssl.c", NULL};
+static const char* const mod_reqtimeout[] = { "mod_reqtimeout.c", NULL};
 
 void h2_h2_register_hooks(void)
 {
-    /* When the connection processing actually starts, we might to
-     * take over, if h2* was selected as protocol.
+    /* Our main processing needs to run quite late. Definitely after mod_ssl,
+     * as we need its connection filters, but also before reqtimeout as its
+     * method of timeouts is specific to HTTP/1.1 (as of now).
+     * The core HTTP/1 processing run as REALLY_LAST, so we will have
+     * a chance to take over before it.
      */
     ap_hook_process_connection(h2_h2_process_conn, 
-                               mod_ssl, NULL, APR_HOOK_MIDDLE);
+                               mod_ssl, mod_reqtimeout, APR_HOOK_LAST);
                                
-    /* Perform connection cleanup before the actual processing happens.
-     */
-    ap_hook_process_connection(h2_h2_remove_timeout, 
-                               mod_reqtimeout, NULL, APR_HOOK_LAST);
-    
     /* With "H2SerializeHeaders On", we install the filter in this hook
      * that parses the response. This needs to happen before any other post
      * read function terminates the request with an error. Otherwise we will
@@ -577,81 +574,84 @@ void h2_h2_register_hooks(void)
     ap_hook_post_read_request(h2_h2_post_read_req, NULL, NULL, APR_HOOK_REALLY_FIRST);
 }
 
-static int h2_h2_remove_timeout(conn_rec* c)
+int h2_h2_process_conn(conn_rec* c)
 {
-    h2_ctx *ctx = h2_ctx_get(c);
+    apr_status_t status;
+    h2_ctx *ctx;
     
-    if (h2_ctx_is_active(ctx) && !h2_ctx_is_task(ctx)) {
-        /* cleanup on master h2 connections */
-        ap_remove_input_filter_byhandle(c->input_filters, "reqtimeout");
+    if (c->master) {
+        return DECLINED;
     }
     
-    return DECLINED;
-}
-
-int h2_h2_process_conn(conn_rec* c)
-{
-    h2_ctx *ctx = h2_ctx_get(c);
-    
+    ctx = h2_ctx_get(c, 0);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_h2, process_conn");
     if (h2_ctx_is_task(ctx)) {
         /* our stream pseudo connection */
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c, "h2_h2, task, declined");
         return DECLINED;
     }
-
-    if (h2_ctx_protocol_get(c)) {
-        /* Something has been negotiated */
-    }
-    else if (!strcmp(AP_PROTOCOL_HTTP1, ap_get_protocol(c))
-             && h2_allows_h2_direct(c) 
-             && h2_is_acceptable_connection(c, 1)) {
-        /* connection still is on http/1.1 and H2Direct is enabled. 
-         * Otherwise connection is in a fully acceptable state.
-         * -> peek at the first 24 incoming bytes
-         */
-        apr_bucket_brigade *temp;
-        apr_status_t status;
-        char *s = NULL;
-        apr_size_t slen;
-        
-        temp = apr_brigade_create(c->pool, c->bucket_alloc);
-        status = ap_get_brigade(c->input_filters, temp,
-                                AP_MODE_SPECULATIVE, APR_BLOCK_READ, 24);
+    
+    if (!ctx && c->keepalives == 0) {
+        const char *proto = ap_get_protocol(c);
         
-        if (status != APR_SUCCESS) {
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c,
-                          "h2_h2, error reading 24 bytes speculative");
-            apr_brigade_destroy(temp);
-            return DECLINED;
+        if (APLOGctrace1(c)) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_h2, process_conn, "
+                          "new connection using protocol '%s', direct=%d, "
+                          "tls acceptable=%d", proto, h2_allows_h2_direct(c), 
+                          h2_is_acceptable_connection(c, 1));
         }
         
-        apr_brigade_pflatten(temp, &s, &slen, c->pool);
-        if ((slen >= 24) && !memcmp(H2_MAGIC_TOKEN, s, 24)) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
-                          "h2_h2, direct mode detected");
-            h2_ctx_protocol_set(ctx, h2_h2_is_tls(c)? "h2" : "h2c");
-        }
-        else {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c,
-                          "h2_h2, not detected in %d bytes: %s", 
-                          (int)slen, s);
+        if (!strcmp(AP_PROTOCOL_HTTP1, proto)
+            && h2_allows_h2_direct(c) 
+            && h2_is_acceptable_connection(c, 1)) {
+            /* Fresh connection still is on http/1.1 and H2Direct is enabled. 
+             * Otherwise connection is in a fully acceptable state.
+             * -> peek at the first 24 incoming bytes
+             */
+            apr_bucket_brigade *temp;
+            char *s = NULL;
+            apr_size_t slen;
+            
+            temp = apr_brigade_create(c->pool, c->bucket_alloc);
+            status = ap_get_brigade(c->input_filters, temp,
+                                    AP_MODE_SPECULATIVE, APR_BLOCK_READ, 24);
+            
+            if (status != APR_SUCCESS) {
+                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c,
+                              "h2_h2, error reading 24 bytes speculative");
+                apr_brigade_destroy(temp);
+                return DECLINED;
+            }
+            
+            apr_brigade_pflatten(temp, &s, &slen, c->pool);
+            if ((slen >= 24) && !memcmp(H2_MAGIC_TOKEN, s, 24)) {
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+                              "h2_h2, direct mode detected");
+                if (!ctx) {
+                    ctx = h2_ctx_get(c, 1);
+                }
+                h2_ctx_protocol_set(ctx, h2_h2_is_tls(c)? "h2" : "h2c");
+            }
+            else {
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c,
+                              "h2_h2, not detected in %d bytes: %s", 
+                              (int)slen, s);
+            }
+            
+            apr_brigade_destroy(temp);
         }
-        
-        apr_brigade_destroy(temp);
-    }
-    else {
-        /* the connection is not HTTP/1.1 or not for us, don't touch it */
-        return DECLINED;
     }
 
-    /* If "h2" was selected as protocol (by whatever mechanism), take over
-     * the connection.
-     */
-    if (h2_ctx_is_active(ctx)) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
-                      "h2_h2, connection, h2 active");
-        
-        return h2_conn_process(c, NULL, ctx->server);
+    if (ctx) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "process_conn");
+        if (!h2_ctx_session_get(ctx)) {
+            status = h2_conn_setup(ctx, c, NULL);
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, "conn_setup");
+            if (status != APR_SUCCESS) {
+                return status;
+            }
+        }
+        return h2_conn_run(ctx, c);
     }
     
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_h2, declined");
@@ -660,28 +660,42 @@ int h2_h2_process_conn(conn_rec* c)
 
 static int h2_h2_post_read_req(request_rec *r)
 {
-    h2_ctx *ctx = h2_ctx_rget(r);
-    struct h2_task *task = h2_ctx_get_task(ctx);
-    if (task) {
-        /* FIXME: sometimes, this hook gets called twice for a single request.
-         * This should not be, right? */
-        /* h2_task connection for a stream, not for h2c */
-        ap_log_rerror(APLOG_MARK, APLOG_TRACE3, 0, r,
-                      "adding h1_to_h2_resp output filter");
-        if (task->serialize_headers) {
-            ap_remove_output_filter_byhandle(r->output_filters, "H1_TO_H2_RESP");
-            ap_add_output_filter("H1_TO_H2_RESP", task, r, r->connection);
-        }
-        else {
-            /* replace the core http filter that formats response headers
-             * in HTTP/1 with our own that collects status and headers */
-            ap_remove_output_filter_byhandle(r->output_filters, "HTTP_HEADER");
-            ap_remove_output_filter_byhandle(r->output_filters, "H2_RESPONSE");
-            ap_add_output_filter("H2_RESPONSE", task, r, r->connection);
+    /* slave connection? */
+    if (r->connection->master) {
+        h2_ctx *ctx = h2_ctx_rget(r);
+        struct h2_task *task = h2_ctx_get_task(ctx);
+        /* This hook will get called twice on internal redirects. Take care
+         * that we manipulate filters only once. */
+        /* our slave connection? */
+        if (task && !task->filters_set) {
+            ap_filter_t *f;
+            
+            /* setup the correct output filters to process the response
+             * on the proper mod_http2 way. */
+            ap_log_rerror(APLOG_MARK, APLOG_TRACE3, 0, r, "adding task output filter");
+            if (task->ser_headers) {
+                ap_add_output_filter("H1_TO_H2_RESP", task, r, r->connection);
+            }
+            else {
+                /* replace the core http filter that formats response headers
+                 * in HTTP/1 with our own that collects status and headers */
+                ap_remove_output_filter_byhandle(r->output_filters, "HTTP_HEADER");
+                ap_add_output_filter("H2_RESPONSE", task, r, r->connection);
+            }
+            
+            /* trailers processing. Incoming trailers are added to this
+             * request via our h2 input filter, outgoing trailers
+             * in a special h2 out filter. */
+            for (f = r->input_filters; f; f = f->next) {
+                if (!strcmp("H2_TO_H1", f->frec->name)) {
+                    f->r = r;
+                    break;
+                }
+            }
+            ap_add_output_filter("H2_TRAILERS", task, r, r->connection);
+            task->filters_set = 1;
         }
-        ap_add_output_filter("H2_TRAILERS", task, r, r->connection);
     }
     return DECLINED;
 }
 
-

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_io.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_io.c?rev=1725301&r1=1725300&r2=1725301&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_io.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_io.c Mon Jan 18 16:22:57 2016
@@ -15,26 +15,31 @@
 
 #include <assert.h>
 
+#include <apr_pools.h>
+#include <apr_thread_mutex.h>
+#include <apr_thread_cond.h>
+
 #include <httpd.h>
 #include <http_core.h>
 #include <http_log.h>
 #include <http_connection.h>
 
 #include "h2_private.h"
+#include "h2_h2.h"
 #include "h2_io.h"
+#include "h2_mplx.h"
 #include "h2_response.h"
+#include "h2_request.h"
 #include "h2_task.h"
 #include "h2_util.h"
 
-h2_io *h2_io_create(int id, apr_pool_t *pool, apr_bucket_alloc_t *bucket_alloc)
+h2_io *h2_io_create(int id, apr_pool_t *pool)
 {
     h2_io *io = apr_pcalloc(pool, sizeof(*io));
     if (io) {
         io->id = id;
         io->pool = pool;
-        io->bucket_alloc = bucket_alloc;
-        io->bbin = NULL;
-        io->bbout = NULL;
+        io->bucket_alloc = apr_bucket_alloc_create(pool);
     }
     return io;
 }
@@ -96,11 +101,107 @@ apr_status_t h2_io_in_shutdown(h2_io *io
     return h2_io_in_close(io);
 }
 
+
+void h2_io_signal_init(h2_io *io, h2_io_op op, int timeout_secs, apr_thread_cond_t *cond)
+{
+    io->timed_op = op;
+    io->timed_cond = cond;
+    if (timeout_secs > 0) {
+        io->timeout_at = apr_time_now() + apr_time_from_sec(timeout_secs);
+    }
+    else {
+        io->timeout_at = 0; 
+    }
+}
+
+void h2_io_signal_exit(h2_io *io)
+{
+    io->timed_cond = NULL;
+    io->timeout_at = 0; 
+}
+
+apr_status_t h2_io_signal_wait(h2_mplx *m, h2_io *io)
+{
+    apr_status_t status;
+    
+    if (io->timeout_at != 0) {
+        status = apr_thread_cond_timedwait(io->timed_cond, m->lock, io->timeout_at);
+        if (APR_STATUS_IS_TIMEUP(status)) {
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,  
+                          "h2_mplx(%ld-%d): stream timeout expired: %s",
+                          m->id, io->id, 
+                          (io->timed_op == H2_IO_READ)? "read" : "write");
+            h2_io_rst(io, H2_ERR_CANCEL);
+        }
+    }
+    else {
+        apr_thread_cond_wait(io->timed_cond, m->lock);
+        status = APR_SUCCESS;
+    }
+    if (io->orphaned && status == APR_SUCCESS) {
+        return APR_ECONNABORTED;
+    }
+    return status;
+}
+
+void h2_io_signal(h2_io *io, h2_io_op op)
+{
+    if (io->timed_cond && (io->timed_op == op || H2_IO_ANY == op)) {
+        apr_thread_cond_signal(io->timed_cond);
+    }
+}
+
+void h2_io_make_orphaned(h2_io *io, int error)
+{
+    io->orphaned = 1;
+    if (error) {
+        h2_io_rst(io, error);
+    }
+    /* if someone is waiting, wake him up */
+    h2_io_signal(io, H2_IO_ANY);
+}
+
+static int add_trailer(void *ctx, const char *key, const char *value)
+{
+    apr_bucket_brigade *bb = ctx;
+    apr_status_t status;
+    
+    status = apr_brigade_printf(bb, NULL, NULL, "%s: %s\r\n", 
+                                key, value);
+    return (status == APR_SUCCESS);
+}
+
+static apr_status_t append_eos(h2_io *io, apr_bucket_brigade *bb, 
+                               apr_table_t *trailers)
+{
+    apr_status_t status = APR_SUCCESS;
+    apr_table_t *t = io->request->trailers;
+
+    if (trailers && t && !apr_is_empty_table(trailers)) {
+        /* trailers passed in, transfer directly. */
+        apr_table_overlap(trailers, t, APR_OVERLAP_TABLES_SET);
+        t = NULL;
+    }
+    
+    if (io->request->chunked) {
+        if (t && !apr_is_empty_table(t)) {
+            /* no trailers passed in, transfer via chunked */
+            status = apr_brigade_puts(bb, NULL, NULL, "0\r\n");
+            apr_table_do(add_trailer, bb, t, NULL);
+            status = apr_brigade_puts(bb, NULL, NULL, "\r\n");
+        }
+        else {
+            status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n");
+        }
+    }
+    APR_BRIGADE_INSERT_TAIL(bb, apr_bucket_eos_create(io->bucket_alloc));
+    return status;
+}
+
 apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb, 
-                           apr_size_t maxlen)
+                           apr_size_t maxlen, apr_table_t *trailers)
 {
     apr_off_t start_len = 0;
-    apr_bucket *last;
     apr_status_t status;
 
     if (io->rst_error) {
@@ -108,21 +209,52 @@ apr_status_t h2_io_in_read(h2_io *io, ap
     }
     
     if (!io->bbin || APR_BRIGADE_EMPTY(io->bbin)) {
-        return io->eos_in? APR_EOF : APR_EAGAIN;
+        if (io->eos_in) {
+            if (!io->eos_in_written) {
+                status = append_eos(io, bb, trailers);
+                io->eos_in_written = 1;
+                return status;
+            }
+            return APR_EOF;
+        }
+        return APR_EAGAIN;
     }
     
-    apr_brigade_length(bb, 1, &start_len);
-    last = APR_BRIGADE_LAST(bb);
-    status = h2_util_move(bb, io->bbin, maxlen, NULL, "h2_io_in_read");
-    if (status == APR_SUCCESS) {
-        apr_bucket *nlast = APR_BRIGADE_LAST(bb);
-        apr_off_t end_len = 0;
-        apr_brigade_length(bb, 1, &end_len);
-        if (last == nlast) {
-            return APR_EAGAIN;
+    if (io->request->chunked) {
+        /* the reader expects HTTP/1.1 chunked encoding */
+        status = h2_util_move(io->tmp, io->bbin, maxlen, NULL, "h2_io_in_read_chunk");
+        if (status == APR_SUCCESS) {
+            apr_off_t tmp_len = 0;
+            
+            apr_brigade_length(io->tmp, 1, &tmp_len);
+            if (tmp_len > 0) {
+                io->input_consumed += tmp_len;
+                status = apr_brigade_printf(bb, NULL, NULL, "%lx\r\n", 
+                                            (unsigned long)tmp_len);
+                if (status == APR_SUCCESS) {
+                    status = h2_util_move(bb, io->tmp, -1, NULL, "h2_io_in_read_tmp1");
+                    if (status == APR_SUCCESS) {
+                        status = apr_brigade_puts(bb, NULL, NULL, "\r\n");
+                    }
+                }
+            }
+            else {
+                status = h2_util_move(bb, io->tmp, -1, NULL, "h2_io_in_read_tmp2");
+            }
+            apr_brigade_cleanup(io->tmp);
         }
-        io->input_consumed += (end_len - start_len);
     }
+    else {
+        apr_brigade_length(bb, 1, &start_len);
+        
+        status = h2_util_move(bb, io->bbin, maxlen, NULL, "h2_io_in_read");
+        if (status == APR_SUCCESS) {
+            apr_off_t end_len = 0;
+            apr_brigade_length(bb, 1, &end_len);
+            io->input_consumed += (end_len - start_len);
+        }
+    }
+    
     return status;
 }
 
@@ -139,6 +271,7 @@ apr_status_t h2_io_in_write(h2_io *io, a
     if (!APR_BRIGADE_EMPTY(bb)) {
         if (!io->bbin) {
             io->bbin = apr_brigade_create(io->pool, io->bucket_alloc);
+            io->tmp = apr_brigade_create(io->pool, io->bucket_alloc);
         }
         return h2_util_move(io->bbin, bb, -1, NULL, "h2_io_in_write");
     }
@@ -151,10 +284,6 @@ apr_status_t h2_io_in_close(h2_io *io)
         return APR_ECONNABORTED;
     }
     
-    if (io->bbin) {
-        APR_BRIGADE_INSERT_TAIL(io->bbin, 
-                                apr_bucket_eos_create(io->bbin->bucket_alloc));
-    }
     io->eos_in = 1;
     return APR_SUCCESS;
 }
@@ -226,7 +355,7 @@ static void process_trailers(h2_io *io,
 
 apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb, 
                              apr_size_t maxlen, apr_table_t *trailers,
-                             int *pfile_handles_allowed)
+                             apr_size_t *pfile_buckets_allowed)
 {
     apr_status_t status;
     int start_allowed;
@@ -268,12 +397,12 @@ apr_status_t h2_io_out_write(h2_io *io,
      * many open files already buffered. Otherwise we will run out of
      * file handles.
      */
-    start_allowed = *pfile_handles_allowed;
-    status = h2_util_move(io->bbout, bb, maxlen, pfile_handles_allowed, 
+    start_allowed = *pfile_buckets_allowed;
+    status = h2_util_move(io->bbout, bb, maxlen, pfile_buckets_allowed, 
                           "h2_io_out_write");
     /* track # file buckets moved into our pool */
-    if (start_allowed != *pfile_handles_allowed) {
-        io->files_handles_owned += (start_allowed - *pfile_handles_allowed);
+    if (start_allowed != *pfile_buckets_allowed) {
+        io->files_handles_owned += (start_allowed - *pfile_buckets_allowed);
     }
     return status;
 }
@@ -291,7 +420,7 @@ apr_status_t h2_io_out_close(h2_io *io,
         }
         if (!h2_util_has_eos(io->bbout, -1)) {
             APR_BRIGADE_INSERT_TAIL(io->bbout, 
-                                    apr_bucket_eos_create(io->bbout->bucket_alloc));
+                                    apr_bucket_eos_create(io->bucket_alloc));
         }
     }
     return APR_SUCCESS;

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_io.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_io.h?rev=1725301&r1=1725300&r2=1725301&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_io.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_io.h Mon Jan 18 16:22:57 2016
@@ -18,6 +18,7 @@
 
 struct h2_response;
 struct apr_thread_cond_t;
+struct h2_mplx;
 struct h2_request;
 
 
@@ -25,30 +26,42 @@ typedef apr_status_t h2_io_data_cb(void
 
 typedef int h2_stream_pri_cmp(int stream_id1, int stream_id2, void *ctx);
 
+typedef enum {
+    H2_IO_READ,
+    H2_IO_WRITE,
+    H2_IO_ANY,
+}
+h2_io_op;
 
 typedef struct h2_io h2_io;
 
 struct h2_io {
-    int id;                      /* stream identifier */
-    apr_pool_t *pool;            /* stream pool */
-    int orphaned;                /* h2_stream is gone for this io */
+    int id;                          /* stream identifier */
+    apr_pool_t *pool;                /* stream pool */
+    apr_bucket_alloc_t *bucket_alloc;
     
-    int task_done;
-    const struct h2_request *request;  /* request on this io */
-    int request_body;            /* == 0 iff request has no body */
-    struct h2_response *response;/* response for submit, once created */
-    int rst_error;
-
-    int eos_in;
-    apr_bucket_brigade *bbin;    /* input data for stream */
-    struct apr_thread_cond_t *input_arrived; /* block on reading */
-    apr_size_t input_consumed;   /* how many bytes have been read */
+    const struct h2_request *request;/* request on this io */
+    struct h2_response *response;    /* response to request */
+    int rst_error;                   /* h2 related stream abort error */
+
+    apr_bucket_brigade *bbin;        /* input data for stream */
+    apr_bucket_brigade *bbout;       /* output data from stream */
+    apr_bucket_brigade *tmp;         /* temporary data for chunking */
+
+    unsigned int orphaned       : 1; /* h2_stream is gone for this io */    
+    unsigned int worker_started : 1; /* h2_worker started processing for this io */
+    unsigned int worker_done    : 1; /* h2_worker finished for this io */
+    unsigned int request_body   : 1; /* iff request has body */
+    unsigned int eos_in         : 1; /* input eos has been seen */
+    unsigned int eos_in_written : 1; /* input eos has been forwarded */
+    unsigned int eos_out        : 1; /* output eos has been seen */
     
-    int eos_out;
-    apr_bucket_brigade *bbout;   /* output data from stream */
-    apr_bucket_alloc_t *bucket_alloc;
-    struct apr_thread_cond_t *output_drained; /* block on writing */
+    h2_io_op timed_op;               /* which operation is waited on, if any */
+    struct apr_thread_cond_t *timed_cond; /* condition to wait on, maybe NULL */
+    apr_time_t timeout_at;           /* when IO wait will time out */
     
+    apr_size_t input_consumed;       /* how many bytes have been read */
+        
     int files_handles_owned;
 };
 
@@ -59,7 +72,7 @@ struct h2_io {
 /**
  * Creates a new h2_io for the given stream id. 
  */
-h2_io *h2_io_create(int id, apr_pool_t *pool, apr_bucket_alloc_t *bucket_alloc);
+h2_io *h2_io_create(int id, apr_pool_t *pool);
 
 /**
  * Frees any resources hold by the h2_io instance. 
@@ -86,6 +99,14 @@ int h2_io_in_has_eos_for(h2_io *io);
  */
 int h2_io_out_has_data(h2_io *io);
 
+void h2_io_signal(h2_io *io, h2_io_op op);
+void h2_io_signal_init(h2_io *io, h2_io_op op, int timeout_secs, 
+                       struct apr_thread_cond_t *cond);
+void h2_io_signal_exit(h2_io *io);
+apr_status_t h2_io_signal_wait(struct h2_mplx *m, h2_io *io);
+
+void h2_io_make_orphaned(h2_io *io, int error);
+
 /*******************************************************************************
  * Input handling of streams.
  ******************************************************************************/
@@ -94,7 +115,7 @@ int h2_io_out_has_data(h2_io *io);
  * is currently available, APR_EOF if end of input has been reached.
  */
 apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb, 
-                           apr_size_t maxlen);
+                           apr_size_t maxlen, apr_table_t *trailers);
 
 /**
  * Appends given bucket to the input.
@@ -137,7 +158,7 @@ apr_status_t h2_io_out_read_to(h2_io *io
 
 apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb, 
                              apr_size_t maxlen, apr_table_t *trailers,
-                             int *pfile_buckets_allowed);
+                             apr_size_t *pfile_buckets_allowed);
 
 /**
  * Closes the input. After existing data has been read, APR_EOF will

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c?rev=1725301&r1=1725300&r2=1725301&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c Mon Jan 18 16:22:57 2016
@@ -15,6 +15,7 @@
 
 #include <assert.h>
 #include <stddef.h>
+#include <stdlib.h>
 
 #include <apr_atomic.h>
 #include <apr_thread_mutex.h>
@@ -59,7 +60,8 @@
     } while(0)
 
 
-static int is_aborted(h2_mplx *m, apr_status_t *pstatus) {
+static int is_aborted(h2_mplx *m, apr_status_t *pstatus)
+{
     AP_DEBUG_ASSERT(m);
     if (m->aborted) {
         *pstatus = APR_ECONNABORTED;
@@ -70,12 +72,34 @@ static int is_aborted(h2_mplx *m, apr_st
 
 static void have_out_data_for(h2_mplx *m, int stream_id);
 
+static void check_tx_reservation(h2_mplx *m) 
+{
+    if (m->tx_handles_reserved == 0) {
+        m->tx_handles_reserved += h2_workers_tx_reserve(m->workers, 
+            H2MIN(m->tx_chunk_size, h2_io_set_size(m->stream_ios)));
+    }
+}
+
+static void check_tx_free(h2_mplx *m) 
+{
+    if (m->tx_handles_reserved > m->tx_chunk_size) {
+        apr_size_t count = m->tx_handles_reserved - m->tx_chunk_size;
+        m->tx_handles_reserved = m->tx_chunk_size;
+        h2_workers_tx_free(m->workers, count);
+    }
+    else if (m->tx_handles_reserved 
+             && (!m->stream_ios || h2_io_set_is_empty(m->stream_ios))) {
+        h2_workers_tx_free(m->workers, m->tx_handles_reserved);
+        m->tx_handles_reserved = 0;
+    }
+}
+
 static void h2_mplx_destroy(h2_mplx *m)
 {
     AP_DEBUG_ASSERT(m);
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
-                  "h2_mplx(%ld): destroy, refs=%d", 
-                  m->id, m->refs);
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                  "h2_mplx(%ld): destroy, ios=%d", 
+                  m->id, (int)h2_io_set_size(m->stream_ios));
     m->aborted = 1;
     if (m->ready_ios) {
         h2_io_set_destroy(m->ready_ios);
@@ -86,6 +110,8 @@ static void h2_mplx_destroy(h2_mplx *m)
         m->stream_ios = NULL;
     }
     
+    check_tx_free(m);
+    
     if (m->pool) {
         apr_pool_destroy(m->pool);
     }
@@ -120,7 +146,6 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
     if (m) {
         m->id = c->id;
         APR_RING_ELEM_INIT(m, link);
-        m->refs = 1;
         m->c = c;
         apr_pool_create_ex(&m->pool, parent, NULL, allocator);
         if (!m->pool) {
@@ -135,47 +160,33 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
             return NULL;
         }
         
-        m->bucket_alloc = apr_bucket_alloc_create(m->pool);
-        
         m->q = h2_tq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS));
         m->stream_ios = h2_io_set_create(m->pool);
         m->ready_ios = h2_io_set_create(m->pool);
         m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
         m->workers = workers;
         
-        m->file_handles_allowed = h2_config_geti(conf, H2_CONF_SESSION_FILES);
+        m->tx_handles_reserved = 0;
+        m->tx_chunk_size = 4;
+        
+        m->stream_timeout_secs = h2_config_geti(conf, H2_CONF_STREAM_TIMEOUT_SECS);
     }
     return m;
 }
 
-static void release(h2_mplx *m, int lock)
-{
-    if (lock) {
-        apr_thread_mutex_lock(m->lock);
-        --m->refs;
-        if (m->join_wait) {
-            apr_thread_cond_signal(m->join_wait);
-        }
-        apr_thread_mutex_unlock(m->lock);
-    }
-    else {
-        --m->refs;
-    }
-}
-
-void h2_mplx_reference(h2_mplx *m)
+int h2_mplx_get_max_stream_started(h2_mplx *m)
 {
+    int stream_id = 0;
+    
     apr_thread_mutex_lock(m->lock);
-    ++m->refs;
+    stream_id = m->max_stream_started;
     apr_thread_mutex_unlock(m->lock);
+    
+    return stream_id;
 }
 
-void h2_mplx_release(h2_mplx *m)
+static void workers_register(h2_mplx *m)
 {
-    release(m, 1);
-}
-
-static void workers_register(h2_mplx *m) {
     /* Initially, there was ref count increase for this as well, but
      * this is not needed, even harmful.
      * h2_workers is only a hub for all the h2_worker instances.
@@ -190,11 +201,8 @@ static void workers_register(h2_mplx *m)
     h2_workers_register(m->workers, m);
 }
 
-static void workers_unregister(h2_mplx *m) {
-    h2_workers_unregister(m->workers, m);
-}
-
-static int io_process_events(h2_mplx *m, h2_io *io) {
+static int io_process_events(h2_mplx *m, h2_io *io)
+{
     if (io->input_consumed && m->input_consumed) {
         m->input_consumed(m->input_consumed_ctx, 
                           io->id, io->input_consumed);
@@ -204,7 +212,6 @@ static int io_process_events(h2_mplx *m,
     return 0;
 }
 
-
 static void io_destroy(h2_mplx *m, h2_io *io, int events)
 {
     apr_pool_t *pool = io->pool;
@@ -220,7 +227,8 @@ static void io_destroy(h2_mplx *m, h2_io
     /* The pool is cleared/destroyed which also closes all
      * allocated file handles. Give this count back to our
      * file handle pool. */
-    m->file_handles_allowed += io->files_handles_owned;
+    m->tx_handles_reserved += io->files_handles_owned;
+
     h2_io_set_remove(m->stream_ios, io);
     h2_io_set_remove(m->ready_ios, io);
     h2_io_destroy(io);
@@ -232,28 +240,29 @@ static void io_destroy(h2_mplx *m, h2_io
         }
         m->spare_pool = pool;
     }
+
+    check_tx_free(m);
 }
 
 static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error) 
 {
     /* Remove io from ready set, we will never submit it */
     h2_io_set_remove(m->ready_ios, io);
-    if (io->task_done || h2_tq_remove(m->q, io->id)) {
+    if (!io->worker_started || io->worker_done) {
         /* already finished or not even started yet */
+        h2_tq_remove(m->q, io->id);
         io_destroy(m, io, 1);
         return 0;
     }
     else {
         /* cleanup once task is done */
-        io->orphaned = 1;
-        if (rst_error) {
-            h2_io_rst(io, rst_error);
-        }
+        h2_io_make_orphaned(io, rst_error);
         return 1;
     }
 }
 
-static int stream_done_iter(void *ctx, h2_io *io) {
+static int stream_done_iter(void *ctx, h2_io *io)
+{
     return io_stream_done((h2_mplx*)ctx, io, 0);
 }
 
@@ -261,27 +270,48 @@ apr_status_t h2_mplx_release_and_join(h2
 {
     apr_status_t status;
     
-    workers_unregister(m);
+    h2_workers_unregister(m->workers, m);
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
+        int i, wait_secs = 5;
+        
+        /* disable WINDOW_UPDATE callbacks */
+        h2_mplx_set_consumed_cb(m, NULL, NULL);
+        
         while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) {
-            /* iterator until all h2_io have been orphaned or destroyed */
+            /* iterate until all ios have been orphaned or destroyed */
         }
     
-        release(m, 0);
-        while (m->refs > 0) {
+        /* Any remaining ios have handed out requests to workers that are
+         * not done yet. Any operation they do on their assigned stream ios will
+         * be errored ECONNRESET/ABORTED, so that should find out pretty soon.
+         */
+        for (i = 0; h2_io_set_size(m->stream_ios) > 0; ++i) {
             m->join_wait = wait;
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
-                          "h2_mplx(%ld): release_join, refs=%d, waiting...", 
-                          m->id, m->refs);
-            apr_thread_cond_wait(wait, m->lock);
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                          "h2_mplx(%ld): release_join, waiting on %d worker to report back", 
+                          m->id, (int)h2_io_set_size(m->stream_ios));
+                          
+            status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
+            if (APR_STATUS_IS_TIMEUP(status)) {
+                if (i > 0) {
+                    /* Oh, oh. Still we wait for assigned  workers to report that 
+                     * they are done. Unless we have a bug, a worker seems to be hanging. 
+                     * If we exit now, all will be deallocated and the worker, once 
+                     * it does return, will walk all over freed memory...
+                     */
+                    ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+                                  "h2_mplx(%ld): release, waiting for %d seconds now for "
+                                  "all h2_workers to return, have still %d requests outstanding", 
+                                  m->id, i*wait_secs, (int)h2_io_set_size(m->stream_ios));
+                }
+            }
         }
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
-                      "h2_mplx(%ld): release_join -> destroy, (#ios=%ld)", 
-                      m->id, (long)h2_io_set_size(m->stream_ios));
+                      "h2_mplx(%ld): release_join -> destroy", m->id);
+        apr_thread_mutex_unlock(m->lock);
         h2_mplx_destroy(m);
         /* all gone */
-        /*apr_thread_mutex_unlock(m->lock);*/
     }
     return status;
 }
@@ -289,11 +319,14 @@ apr_status_t h2_mplx_release_and_join(h2
 void h2_mplx_abort(h2_mplx *m)
 {
     apr_status_t status;
+    
     AP_DEBUG_ASSERT(m);
-    status = apr_thread_mutex_lock(m->lock);
-    if (APR_SUCCESS == status) {
-        m->aborted = 1;
-        apr_thread_mutex_unlock(m->lock);
+    if (!m->aborted) {
+        status = apr_thread_mutex_lock(m->lock);
+        if (APR_SUCCESS == status) {
+            m->aborted = 1;
+            apr_thread_mutex_unlock(m->lock);
+        }
     }
 }
 
@@ -318,50 +351,86 @@ apr_status_t h2_mplx_stream_done(h2_mplx
     return status;
 }
 
-void h2_mplx_task_done(h2_mplx *m, int stream_id)
+static const h2_request *pop_request(h2_mplx *m)
 {
+    const h2_request *req = NULL;
+    int sid;
+    while (!m->aborted && !req && (sid = h2_tq_shift(m->q)) > 0) {
+        h2_io *io = h2_io_set_get(m->stream_ios, sid);
+        if (io) {
+            req = io->request;
+            io->worker_started = 1;
+            if (sid > m->max_stream_started) {
+                m->max_stream_started = sid;
+            }
+        }
+    }
+    return req;
+}
+
+void h2_mplx_request_done(h2_mplx **pm, int stream_id, const h2_request **preq)
+{
+    h2_mplx *m = *pm;
+    
     apr_status_t status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                      "h2_mplx(%ld): task(%d) done", m->id, stream_id);
+                      "h2_mplx(%ld): request(%d) done", m->id, stream_id);
         if (io) {
-            io->task_done = 1;
+            io->worker_done = 1;
             if (io->orphaned) {
                 io_destroy(m, io, 0);
+                if (m->join_wait) {
+                    apr_thread_cond_signal(m->join_wait);
+                }
             }
             else {
                 /* hang around until the stream deregisteres */
             }
         }
+        
+        if (preq) {
+            /* someone wants another request, if we have */
+            *preq = pop_request(m);
+        }
+        if (!preq || !*preq) {
+            /* No request to hand back to the worker, NULLify reference
+             * and decrement count */
+            *pm = NULL;
+        }
         apr_thread_mutex_unlock(m->lock);
     }
 }
 
 apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
-                             int stream_id, apr_bucket_brigade *bb,
+                             int stream_id, apr_bucket_brigade *bb, 
+                             apr_table_t *trailers,
                              struct apr_thread_cond_t *iowait)
 {
     apr_status_t status; 
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
         if (io && !io->orphaned) {
-            io->input_arrived = iowait;
             H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_pre");
-            status = h2_io_in_read(io, bb, -1);
+            
+            h2_io_signal_init(io, H2_IO_READ, m->stream_timeout_secs, iowait);
+            status = h2_io_in_read(io, bb, -1, trailers);
             while (APR_STATUS_IS_EAGAIN(status) 
                    && !is_aborted(m, &status)
                    && block == APR_BLOCK_READ) {
-                apr_thread_cond_wait(io->input_arrived, m->lock);
-                status = h2_io_in_read(io, bb, -1);
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
+                              "h2_mplx(%ld-%d): wait on in data (BLOCK_READ)", 
+                              m->id, stream_id);
+                status = h2_io_signal_wait(m, io);
+                if (status == APR_SUCCESS) {
+                    status = h2_io_in_read(io, bb, -1, trailers);
+                }
             }
             H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_post");
-            io->input_arrived = NULL;
+            h2_io_signal_exit(io);
         }
         else {
             status = APR_EOF;
@@ -376,9 +445,6 @@ apr_status_t h2_mplx_in_write(h2_mplx *m
 {
     apr_status_t status;
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
@@ -386,13 +452,11 @@ apr_status_t h2_mplx_in_write(h2_mplx *m
             H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_pre");
             status = h2_io_in_write(io, bb);
             H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_post");
-            if (io->input_arrived) {
-                apr_thread_cond_signal(io->input_arrived);
-            }
+            h2_io_signal(io, H2_IO_READ);
             io_process_events(m, io);
         }
         else {
-            status = APR_EOF;
+            status = APR_ECONNABORTED;
         }
         apr_thread_mutex_unlock(m->lock);
     }
@@ -403,18 +467,13 @@ apr_status_t h2_mplx_in_close(h2_mplx *m
 {
     apr_status_t status;
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
         if (io && !io->orphaned) {
             status = h2_io_in_close(io);
             H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_close");
-            if (io->input_arrived) {
-                apr_thread_cond_signal(io->input_arrived);
-            }
+            h2_io_signal(io, H2_IO_READ);
             io_process_events(m, io);
         }
         else {
@@ -477,9 +536,6 @@ apr_status_t h2_mplx_out_readx(h2_mplx *
 {
     apr_status_t status;
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
@@ -488,8 +544,8 @@ apr_status_t h2_mplx_out_readx(h2_mplx *
             
             status = h2_io_out_readx(io, cb, ctx, plen, peos);
             H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_readx_post");
-            if (status == APR_SUCCESS && cb && io->output_drained) {
-                apr_thread_cond_signal(io->output_drained);
+            if (status == APR_SUCCESS && cb) {
+                h2_io_signal(io, H2_IO_WRITE);
             }
         }
         else {
@@ -509,9 +565,6 @@ apr_status_t h2_mplx_out_read_to(h2_mplx
 {
     apr_status_t status;
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
@@ -521,8 +574,8 @@ apr_status_t h2_mplx_out_read_to(h2_mplx
             status = h2_io_out_read_to(io, bb, plen, peos);
             
             H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_read_to_post");
-            if (status == APR_SUCCESS && io->output_drained) {
-                apr_thread_cond_signal(io->output_drained);
+            if (status == APR_SUCCESS) {
+                h2_io_signal(io, H2_IO_WRITE);
             }
         }
         else {
@@ -538,14 +591,12 @@ h2_stream *h2_mplx_next_submit(h2_mplx *
 {
     apr_status_t status;
     h2_stream *stream = NULL;
+
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return NULL;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
         h2_io *io = h2_io_set_pop_highest_prio(m->ready_ios);
-        if (io) {
+        if (io && !m->aborted) {
             stream = h2_stream_set_get(streams, io->id);
             if (stream) {
                 if (io->rst_error) {
@@ -557,19 +608,18 @@ h2_stream *h2_mplx_next_submit(h2_mplx *
                     h2_stream_set_response(stream, io->response, io->bbout);
                     H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_next_submit_post");
                 }
-                
             }
             else {
                 /* We have the io ready, but the stream has gone away, maybe
                  * reset by the client. Should no longer happen since such
                  * streams should clear io's from the ready queue.
                  */
-                ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c, APLOGNO(02953) 
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,  
                               "h2_mplx(%ld): stream for response %d closed, "
                               "resetting io to close request processing",
                               m->id, io->id);
-                io->orphaned = 1;
-                if (io->task_done) {
+                h2_io_make_orphaned(io, H2_ERR_STREAM_CLOSED);
+                if (!io->worker_started || io->worker_done) {
                     io_destroy(m, io, 1);
                 }
                 else {
@@ -577,14 +627,11 @@ h2_stream *h2_mplx_next_submit(h2_mplx *
                      * shutdown input and send out any events (e.g. window
                      * updates) asap. */
                     h2_io_in_shutdown(io);
-                    h2_io_rst(io, H2_ERR_STREAM_CLOSED);
                     io_process_events(m, io);
                 }
             }
             
-            if (io->output_drained) {
-                apr_thread_cond_signal(io->output_drained);
-            }
+            h2_io_signal(io, H2_IO_WRITE);
         }
         apr_thread_mutex_unlock(m->lock);
     }
@@ -602,28 +649,29 @@ static apr_status_t out_write(h2_mplx *m
      * We will not split buckets to enforce the limit to the last
      * byte. After all, the bucket is already in memory.
      */
-    while (!APR_BRIGADE_EMPTY(bb) 
-           && (status == APR_SUCCESS)
+    while (status == APR_SUCCESS 
+           && !APR_BRIGADE_EMPTY(bb) 
            && !is_aborted(m, &status)) {
         
         status = h2_io_out_write(io, bb, m->stream_max_mem, trailers,
-                                 &m->file_handles_allowed);
-        /* Wait for data to drain until there is room again */
-        while (!APR_BRIGADE_EMPTY(bb) 
+                                 &m->tx_handles_reserved);
+        /* Wait for data to drain until there is room again or
+         * stream timeout expires */
+        h2_io_signal_init(io, H2_IO_WRITE, m->stream_timeout_secs, iowait);
+        while (status == APR_SUCCESS
+               && !APR_BRIGADE_EMPTY(bb) 
                && iowait
-               && status == APR_SUCCESS
                && (m->stream_max_mem <= h2_io_out_length(io))
                && !is_aborted(m, &status)) {
             trailers = NULL;
-            io->output_drained = iowait;
             if (f) {
                 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
                               "h2_mplx(%ld-%d): waiting for out drain", 
                               m->id, io->id);
             }
-            apr_thread_cond_wait(io->output_drained, m->lock);
-            io->output_drained = NULL;
+            status = h2_io_signal_wait(m, io);
         }
+        h2_io_signal_exit(io);
     }
     apr_brigade_cleanup(bb);
     
@@ -647,6 +695,11 @@ static apr_status_t out_open(h2_mplx *m,
         
         h2_io_set_response(io, response);
         h2_io_set_add(m->ready_ios, io);
+        if (response && response->http_status < 300) {
+            /* we might see some file buckets in the output, see
+             * if we have enough handles reserved. */
+            check_tx_reservation(m);
+        }
         if (bb) {
             status = out_write(m, io, f, bb, response->trailers, iowait);
         }
@@ -664,24 +717,22 @@ apr_status_t h2_mplx_out_open(h2_mplx *m
 {
     apr_status_t status;
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        status = out_open(m, stream_id, response, f, bb, iowait);
-        if (APLOGctrace1(m->c)) {
-            h2_util_bb_log(m->c, stream_id, APLOG_TRACE1, "h2_mplx_out_open", bb);
-        }
         if (m->aborted) {
-            return APR_ECONNABORTED;
+            status = APR_ECONNABORTED;
+        }
+        else {
+            status = out_open(m, stream_id, response, f, bb, iowait);
+            if (APLOGctrace1(m->c)) {
+                h2_util_bb_log(m->c, stream_id, APLOG_TRACE1, "h2_mplx_out_open", bb);
+            }
         }
         apr_thread_mutex_unlock(m->lock);
     }
     return status;
 }
 
-
 apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id, 
                                ap_filter_t* f, apr_bucket_brigade *bb,
                                apr_table_t *trailers,
@@ -689,33 +740,22 @@ apr_status_t h2_mplx_out_write(h2_mplx *
 {
     apr_status_t status;
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        if (!m->aborted) {
-            h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-            if (io && !io->orphaned) {
-                status = out_write(m, io, f, bb, trailers, iowait);
-                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
-                              "h2_mplx(%ld-%d): write with trailers=%s", 
-                              m->id, io->id, trailers? "yes" : "no");
-                H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_write");
-                
-                have_out_data_for(m, stream_id);
-                if (m->aborted) {
-                    return APR_ECONNABORTED;
-                }
-            }
-            else {
-                status = APR_ECONNABORTED;
-            }
+        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+        if (io && !io->orphaned) {
+            status = out_write(m, io, f, bb, trailers, iowait);
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+                          "h2_mplx(%ld-%d): write with trailers=%s", 
+                          m->id, io->id, trailers? "yes" : "no");
+            H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_write");
+            
+            have_out_data_for(m, stream_id);
         }
-        
-        if (m->lock) {
-            apr_thread_mutex_unlock(m->lock);
+        else {
+            status = APR_ECONNABORTED;
         }
+        apr_thread_mutex_unlock(m->lock);
     }
     return status;
 }
@@ -724,43 +764,32 @@ apr_status_t h2_mplx_out_close(h2_mplx *
 {
     apr_status_t status;
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        if (!m->aborted) {
-            h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-            if (io && !io->orphaned) {
-                if (!io->response && !io->rst_error) {
-                    /* In case a close comes before a response was created,
-                     * insert an error one so that our streams can properly
-                     * reset.
-                     */
-                    h2_response *r = h2_response_die(stream_id, APR_EGENERAL, 
-                                                     io->request, m->pool);
-                    status = out_open(m, stream_id, r, NULL, NULL, NULL);
-                    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
-                                  "h2_mplx(%ld-%d): close, no response, no rst", 
-                                  m->id, io->id);
-                }
-                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
-                              "h2_mplx(%ld-%d): close with trailers=%s", 
-                              m->id, io->id, trailers? "yes" : "no");
-                status = h2_io_out_close(io, trailers);
-                H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close");
-                
-                have_out_data_for(m, stream_id);
-                if (m->aborted) {
-                    /* if we were the last output, the whole session might
-                     * have gone down in the meantime.
-                     */
-                    return APR_SUCCESS;
-                }
-            }
-            else {
-                status = APR_ECONNABORTED;
+        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+        if (io && !io->orphaned) {
+            if (!io->response && !io->rst_error) {
+                /* In case a close comes before a response was created,
+                 * insert an error one so that our streams can properly
+                 * reset.
+                 */
+                h2_response *r = h2_response_die(stream_id, APR_EGENERAL, 
+                                                 io->request, m->pool);
+                status = out_open(m, stream_id, r, NULL, NULL, NULL);
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+                              "h2_mplx(%ld-%d): close, no response, no rst", 
+                              m->id, io->id);
             }
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+                          "h2_mplx(%ld-%d): close with trailers=%s", 
+                          m->id, io->id, trailers? "yes" : "no");
+            status = h2_io_out_close(io, trailers);
+            H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close");
+            
+            have_out_data_for(m, stream_id);
+        }
+        else {
+            status = APR_ECONNABORTED;
         }
         apr_thread_mutex_unlock(m->lock);
     }
@@ -771,28 +800,21 @@ apr_status_t h2_mplx_out_rst(h2_mplx *m,
 {
     apr_status_t status;
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        if (!m->aborted) {
-            h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-            if (io && !io->rst_error && !io->orphaned) {
-                h2_io_rst(io, error);
-                if (!io->response) {
-                        h2_io_set_add(m->ready_ios, io);
-                }
-                H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_rst");
-                
-                have_out_data_for(m, stream_id);
-                if (io->output_drained) {
-                    apr_thread_cond_signal(io->output_drained);
-                }
-            }
-            else {
-                status = APR_ECONNABORTED;
+        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+        if (io && !io->rst_error && !io->orphaned) {
+            h2_io_rst(io, error);
+            if (!io->response) {
+                h2_io_set_add(m->ready_ios, io);
             }
+            H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_rst");
+            
+            have_out_data_for(m, stream_id);
+            h2_io_signal(io, H2_IO_WRITE);
+        }
+        else {
+            status = APR_ECONNABORTED;
         }
         apr_thread_mutex_unlock(m->lock);
     }
@@ -804,14 +826,14 @@ int h2_mplx_in_has_eos_for(h2_mplx *m, i
     int has_eos = 0;
     apr_status_t status;
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return 0;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        if (io) {
-            has_eos = io->orphaned || h2_io_in_has_eos_for(io);
+        if (io && !io->orphaned) {
+            has_eos = h2_io_in_has_eos_for(io);
+        }
+        else {
+            has_eos = 1;
         }
         apr_thread_mutex_unlock(m->lock);
     }
@@ -823,15 +845,15 @@ int h2_mplx_out_has_data_for(h2_mplx *m,
     apr_status_t status;
     int has_data = 0;
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return 0;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        if (io) {
+        if (io && !io->orphaned) {
             has_data = h2_io_out_has_data(io);
         }
+        else {
+            has_data = 0;
+        }
         apr_thread_mutex_unlock(m->lock);
     }
     return has_data;
@@ -842,19 +864,21 @@ apr_status_t h2_mplx_out_trywait(h2_mplx
 {
     apr_status_t status;
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        m->added_output = iowait;
-        status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
-        if (APLOGctrace2(m->c)) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                          "h2_mplx(%ld): trywait on data for %f ms)",
-                          m->id, timeout/1000.0);
+        if (m->aborted) {
+            status = APR_ECONNABORTED;
+        }
+        else {
+            m->added_output = iowait;
+            status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
+            if (APLOGctrace2(m->c)) {
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                              "h2_mplx(%ld): trywait on data for %f ms)",
+                              m->id, timeout/1000.0);
+            }
+            m->added_output = NULL;
         }
-        m->added_output = NULL;
         apr_thread_mutex_unlock(m->lock);
     }
     return status;
@@ -874,18 +898,19 @@ apr_status_t h2_mplx_reprioritize(h2_mpl
     apr_status_t status;
     
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        h2_tq_sort(m->q, cmp, ctx);
-        
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                      "h2_mplx(%ld): reprioritize tasks", m->id);
+        if (m->aborted) {
+            status = APR_ECONNABORTED;
+        }
+        else {
+            h2_tq_sort(m->q, cmp, ctx);
+            
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                          "h2_mplx(%ld): reprioritize tasks", m->id);
+        }
         apr_thread_mutex_unlock(m->lock);
     }
-    workers_register(m);
     return status;
 }
 
@@ -901,72 +926,66 @@ static h2_io *open_io(h2_mplx *m, int st
         m->spare_pool = NULL;
     }
     
-    io = h2_io_create(stream_id, io_pool, m->bucket_alloc);
+    io = h2_io_create(stream_id, io_pool);
     h2_io_set_add(m->stream_ios, io);
     
     return io;
 }
 
 
-apr_status_t h2_mplx_process(h2_mplx *m, int stream_id,
-                             const h2_request *req, int eos, 
+apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, const h2_request *req, 
                              h2_stream_pri_cmp *cmp, void *ctx)
 {
     apr_status_t status;
+    int was_empty = 0;
     
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        return APR_ECONNABORTED;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        h2_io *io = open_io(m, stream_id);
-        io->request = req;
-        io->request_body = !eos;
-
-        if (eos) {
-            status = h2_io_in_close(io);
+        if (m->aborted) {
+            status = APR_ECONNABORTED;
+        }
+        else {
+            h2_io *io = open_io(m, stream_id);
+            io->request = req;
+            
+            if (!io->request->body) {
+                status = h2_io_in_close(io);
+            }
+            
+            was_empty = h2_tq_empty(m->q);
+            h2_tq_add(m->q, io->id, cmp, ctx);
+            
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+                          "h2_mplx(%ld-%d): process", m->c->id, stream_id);
+            H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_process");
         }
-        
-        h2_tq_add(m->q, io->id, cmp, ctx);
-
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
-                      "h2_mplx(%ld-%d): process", m->c->id, stream_id);
-        H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_process");
         apr_thread_mutex_unlock(m->lock);
     }
-    
-    if (status == APR_SUCCESS) {
+    if (status == APR_SUCCESS && was_empty) {
         workers_register(m);
     }
     return status;
 }
 
-h2_task *h2_mplx_pop_task(h2_mplx *m, h2_worker *w, int *has_more)
+const h2_request *h2_mplx_pop_request(h2_mplx *m, int *has_more)
 {
-    h2_task *task = NULL;
+    const h2_request *req = NULL;
     apr_status_t status;
     
     AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        *has_more = 0;
-        return NULL;
-    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        int sid;
-        while (!task && (sid = h2_tq_shift(m->q)) > 0) {
-            /* Anything not already setup correctly in the task
-             * needs to be so now, as task will be executed right about 
-             * when this method returns. */
-            h2_io *io = h2_io_set_get(m->stream_ios, sid);
-            if (io) {
-                task = h2_worker_create_task(w, m, io->request, !io->request_body);
-            }
+        if (m->aborted) {
+            req = NULL;
+            *has_more = 0;
+        }
+        else {
+            req = pop_request(m);
+            *has_more = !h2_tq_empty(m->q);
         }
-        *has_more = !h2_tq_empty(m->q);
         apr_thread_mutex_unlock(m->lock);
     }
-    return task;
+    return req;
 }
 

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h?rev=1725301&r1=1725300&r2=1725301&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h Mon Jan 18 16:22:57 2016
@@ -18,7 +18,7 @@
 
 /**
  * The stream multiplexer. It pushes buckets from the connection
- * thread to the stream task threads and vice versa. It's thread-safe
+ * thread to the stream threads and vice versa. It's thread-safe
  * to use.
  *
  * There is one h2_mplx instance for each h2_session, which sits on top
@@ -44,7 +44,6 @@ struct h2_stream;
 struct h2_request;
 struct h2_io_set;
 struct apr_thread_cond_t;
-struct h2_worker;
 struct h2_workers;
 struct h2_stream_set;
 struct h2_task_queue;
@@ -65,22 +64,26 @@ struct h2_mplx {
     volatile int refs;
     conn_rec *c;
     apr_pool_t *pool;
-    apr_bucket_alloc_t *bucket_alloc;
+
+    unsigned int aborted : 1;
 
     struct h2_task_queue *q;
     struct h2_io_set *stream_ios;
     struct h2_io_set *ready_ios;
     
+    int max_stream_started;      /* highest stream id that started processing */
+
     apr_thread_mutex_t *lock;
     struct apr_thread_cond_t *added_output;
     struct apr_thread_cond_t *join_wait;
     
-    int aborted;
     apr_size_t stream_max_mem;
+    int stream_timeout_secs;
     
     apr_pool_t *spare_pool;           /* spare pool, ready for next io */
     struct h2_workers *workers;
-    int file_handles_allowed;
+    apr_size_t tx_handles_reserved;
+    apr_size_t tx_chunk_size;
     
     h2_mplx_consumed_cb *input_consumed;
     void *input_consumed_ctx;
@@ -101,16 +104,6 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
                         struct h2_workers *workers);
 
 /**
- * Increase the reference counter of this mplx.
- */
-void h2_mplx_reference(h2_mplx *m);
-
-/**
- * Decreases the reference counter of this mplx.
- */
-void h2_mplx_release(h2_mplx *m);
-
-/**
  * Decreases the reference counter of this mplx and waits for it
  * to reached 0, destroy the mplx afterwards.
  * This is to be called from the thread that created the mplx in
@@ -122,11 +115,19 @@ apr_status_t h2_mplx_release_and_join(h2
 
 /**
  * Aborts the multiplexer. It will answer all future invocation with
- * APR_ECONNABORTED, leading to early termination of ongoing tasks.
+ * APR_ECONNABORTED, leading to early termination of ongoing streams.
  */
 void h2_mplx_abort(h2_mplx *mplx);
 
-void h2_mplx_task_done(h2_mplx *m, int stream_id);
+void h2_mplx_request_done(h2_mplx **pm, int stream_id, const struct h2_request **preq);
+
+/**
+ * Get the highest stream identifier that has been passed on to processing.
+ * Maybe 0 in case no stream has been processed yet.
+ * @param m the multiplexer
+ * @return highest stream identifier for which processing started
+ */
+int h2_mplx_get_max_stream_started(h2_mplx *m);
 
 /*******************************************************************************
  * IO lifetime of streams.
@@ -163,16 +164,14 @@ apr_status_t h2_mplx_out_trywait(h2_mplx
  * @param m the multiplexer
  * @param stream_id the identifier of the stream
  * @param r the request to be processed
- * @param eos if input is complete
  * @param cmp the stream priority compare function
  * @param ctx context data for the compare function
  */
-apr_status_t h2_mplx_process(h2_mplx *m, int stream_id,
-                             const struct h2_request *r, int eos, 
+apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, const struct h2_request *r, 
                              h2_stream_pri_cmp *cmp, void *ctx);
 
 /**
- * Stream priorities have changed, reschedule pending tasks.
+ * Stream priorities have changed, reschedule pending requests.
  * 
  * @param m the multiplexer
  * @param cmp the stream priority compare function
@@ -180,7 +179,7 @@ apr_status_t h2_mplx_process(h2_mplx *m,
  */
 apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx);
 
-struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, struct h2_worker *w, int *has_more);
+const struct h2_request *h2_mplx_pop_request(h2_mplx *mplx, int *has_more);
 
 /**
  * Register a callback for the amount of input data consumed per stream. The
@@ -206,6 +205,7 @@ void h2_mplx_set_consumed_cb(h2_mplx *m,
  */
 apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
                              int stream_id, apr_bucket_brigade *bb,
+                             apr_table_t *trailers, 
                              struct apr_thread_cond_t *iowait);
 
 /**

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_private.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_private.h?rev=1725301&r1=1725300&r2=1725301&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_private.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_private.h Mon Jan 18 16:22:57 2016
@@ -33,4 +33,9 @@ APLOG_USE_MODULE(http2);
 #define H2_HEADER_PATH_LEN   5
 #define H2_CRLF             "\r\n"
 
+#define H2_ALEN(a)          (sizeof(a)/sizeof((a)[0]))
+
+#define H2MAX(x,y) ((x) > (y) ? (x) : (y))
+#define H2MIN(x,y) ((x) < (y) ? (x) : (y))
+
 #endif



Mime
View raw message