httpd-cvs mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ic...@apache.org
Subject svn commit: r1782980 [2/4] - in /httpd/httpd/branches/2.4.x: ./ modules/http2/
Date Tue, 14 Feb 2017 15:53:50 GMT
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_filter.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_filter.c?rev=1782980&r1=1782979&r2=1782980&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_filter.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_filter.c Tue Feb 14 15:53:50 2017
@@ -112,9 +112,9 @@ apr_status_t h2_filter_core_input(ap_fil
     apr_interval_time_t saved_timeout = UNSET;
     
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
-                  "core_input(%ld): read, %s, mode=%d, readbytes=%ld", 
-                  (long)f->c->id, (block == APR_BLOCK_READ)? "BLOCK_READ" : "NONBLOCK_READ", 
-                  mode, (long)readbytes);
+                  "h2_session(%ld): read, %s, mode=%d, readbytes=%ld", 
+                  (long)f->c->id, (block == APR_BLOCK_READ)? 
+                  "BLOCK_READ" : "NONBLOCK_READ", mode, (long)readbytes);
     
     if (mode == AP_MODE_INIT || mode == AP_MODE_SPECULATIVE) {
         return ap_get_brigade(f->next, brigade, mode, block, readbytes);
@@ -161,11 +161,11 @@ apr_status_t h2_filter_core_input(ap_fil
         case APR_EAGAIN:
         case APR_TIMEUP:
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
-                          "core_input(%ld): read", (long)f->c->id);
+                          "h2_session(%ld): read", f->c->id);
             break;
         default:
             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, f->c, APLOGNO(03046)
-                          "h2_conn_io: error reading");
+                          "h2_session(%ld): error reading", f->c->id);
             break;
     }
     return status;

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c?rev=1782980&r1=1782979&r2=1782980&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c Tue Feb 14 15:53:50 2017
@@ -40,36 +40,19 @@
 #include "h2_ngn_shed.h"
 #include "h2_request.h"
 #include "h2_stream.h"
+#include "h2_session.h"
 #include "h2_task.h"
 #include "h2_worker.h"
 #include "h2_workers.h"
 #include "h2_util.h"
 
 
-static void h2_beam_log(h2_bucket_beam *beam, int id, const char *msg, 
-                        conn_rec *c, int level)
-{
-    if (beam && APLOG_C_IS_LEVEL(c,level)) {
-        char buffer[2048];
-        apr_size_t off = 0;
-        
-        off += apr_snprintf(buffer+off, H2_ALEN(buffer)-off, "cl=%d, ", beam->closed);
-        off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "to_send", ", ", &beam->send_list);
-        off += h2_util_bb_print(buffer+off, H2_ALEN(buffer)-off, "recv_buffer", ", ", beam->recv_buffer);
-        off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "hold", ", ", &beam->hold_list);
-        off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "purge", "", &beam->purge_list);
-
-        ap_log_cerror(APLOG_MARK, level, 0, c, "beam(%ld-%d): %s %s", 
-                      c->id, id, msg, buffer);
-    }
-}
-
-/* utility for iterating over ihash task sets */
+/* utility for iterating over ihash stream sets */
 typedef struct {
     h2_mplx *m;
-    h2_task *task;
+    h2_stream *stream;
     apr_time_t now;
-} task_iter_ctx;
+} stream_iter_ctx;
 
 /* NULL or the mutex hold by this thread, used for recursive calls
  */
@@ -165,24 +148,23 @@ static int can_beam_file(void *ctx, h2_b
     if (m->tx_handles_reserved > 0) {
         --m->tx_handles_reserved;
         ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
-                      "h2_mplx(%ld-%d): beaming file %s, tx_avail %d", 
+                      "h2_stream(%ld-%d,%s): beaming file, tx_avail %d", 
                       m->id, beam->id, beam->tag, m->tx_handles_reserved);
         return 1;
     }
     ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
-                  "h2_mplx(%ld-%d): can_beam_file denied on %s", 
+                  "h2_stream(%ld-%d,%s): can_beam_file denied", 
                   m->id, beam->id, beam->tag);
     return 0;
 }
 
-static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response);
-static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master);
+static void have_out_data_for(h2_mplx *m, h2_stream *stream);
 
 static void check_tx_reservation(h2_mplx *m) 
 {
     if (m->tx_handles_reserved <= 0) {
         m->tx_handles_reserved += h2_workers_tx_reserve(m->workers, 
-            H2MIN(m->tx_chunk_size, h2_ihash_count(m->tasks)));
+            H2MIN(m->tx_chunk_size, h2_ihash_count(m->streams)));
     }
 }
 
@@ -193,59 +175,44 @@ static void check_tx_free(h2_mplx *m)
         m->tx_handles_reserved = m->tx_chunk_size;
         h2_workers_tx_free(m->workers, count);
     }
-    else if (m->tx_handles_reserved && h2_ihash_empty(m->tasks)) {
+    else if (m->tx_handles_reserved && h2_ihash_empty(m->streams)) {
         h2_workers_tx_free(m->workers, m->tx_handles_reserved);
         m->tx_handles_reserved = 0;
     }
 }
 
-typedef struct {
-    h2_mplx *m;
-} purge_ctx;
-
-static int purge_stream(void *ctx, void *val) 
+static void stream_joined(h2_mplx *m, h2_stream *stream)
 {
-    purge_ctx *pctx = ctx;
-    h2_stream *stream = val;
-    int stream_id = stream->id;
-    h2_task *task;
-
-    /* stream_cleanup clears all buffers and destroys any buckets
-     * that might hold references into task space. Needs to be done
-     * before task destruction, otherwise it will complain. */
-    h2_stream_cleanup(stream);
-    
-    task = h2_ihash_get(pctx->m->tasks, stream_id);    
-    if (task) {
-        task_destroy(pctx->m, task, 1);
-    }
+    ap_assert(!stream->task || stream->task->worker_done);
     
-    h2_stream_destroy(stream);
-    h2_ihash_remove(pctx->m->spurge, stream_id);
-    return 0;
+    h2_ihash_remove(m->shold, stream->id);
+    h2_ihash_add(m->spurge, stream);
+    m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
+    m->tx_handles_reserved += h2_beam_get_files_beamed(stream->output);
 }
 
-static void purge_streams(h2_mplx *m)
+static void stream_cleanup(h2_mplx *m, h2_stream *stream)
 {
-    if (!h2_ihash_empty(m->spurge)) {
-        purge_ctx ctx;
-        ctx.m = m;
-        while(!h2_ihash_iter(m->spurge, purge_stream, &ctx)) {
-            /* repeat until empty */
-        }
-        h2_ihash_clear(m->spurge);
-    }
-}
+    ap_assert(stream->state == H2_SS_CLEANUP);
+    
+    h2_beam_on_produced(stream->output, NULL, NULL);
+    h2_beam_on_consumed(stream->input, NULL, NULL, NULL);
+    h2_beam_abort(stream->input);
+    h2_beam_leave(stream->output);
+    
+    h2_stream_cleanup(stream);
 
-static void h2_mplx_destroy(h2_mplx *m)
-{
-    ap_assert(m);
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                  "h2_mplx(%ld): destroy, tasks=%d", 
-                  m->id, (int)h2_ihash_count(m->tasks));
-    check_tx_free(m);
-    /* pool will be destroyed as child of h2_session->pool,
-       slave connection pools are children of m->pool */
+    h2_iq_remove(m->q, stream->id);
+    h2_ihash_remove(m->streams, stream->id);
+    h2_ihash_add(m->shold, stream);
+    
+    if (!stream->task || stream->task->worker_done) {
+        stream_joined(m, stream);
+    }
+    else if (stream->task) {
+        stream->task->c->aborted = 1;
+        apr_thread_cond_broadcast(m->task_thawed);
+    }
 }
 
 /**
@@ -289,13 +256,11 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
         status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT,
                                          m->pool);
         if (status != APR_SUCCESS) {
-            h2_mplx_destroy(m);
             return NULL;
         }
         
         status = apr_thread_cond_create(&m->task_thawed, m->pool);
         if (status != APR_SUCCESS) {
-            h2_mplx_destroy(m);
             return NULL;
         }
     
@@ -304,12 +269,11 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
         m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
 
         m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+        m->sredo = h2_ihash_create(m->pool, offsetof(h2_stream,id));
         m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
         m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
         m->q = h2_iq_create(m->pool, m->max_streams);
         m->readyq = h2_iq_create(m->pool, m->max_streams);
-        m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
-        m->redo_tasks = h2_ihash_create(m->pool, offsetof(h2_task, stream_id));
 
         m->stream_timeout = stream_timeout;
         m->workers = workers;
@@ -359,121 +323,69 @@ static int output_consumed_signal(h2_mpl
     return 0;
 }
 
-
-static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master)
+static void task_destroy(h2_mplx *m, h2_task *task)
 {
     conn_rec *slave = NULL;
     int reuse_slave = 0;
     
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
-                  "h2_task(%s): destroy", task->id);
-    if (called_from_master) {
-        /* Process outstanding events before destruction */
-        h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
-        if (stream) {
-            input_consumed_signal(m, stream);
-        }
-    }
-    
-    h2_beam_on_produced(task->output.beam, NULL, NULL);
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, 
-                  APLOGNO(03385) "h2_task(%s): destroy "
-                  "output beam empty=%d, holds proxies=%d", 
-                  task->id,
-                  h2_beam_empty(task->output.beam),
-                  h2_beam_holds_proxies(task->output.beam));
-    
     slave = task->c;
     reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc)
                    && !task->rst_error);
     
-    h2_ihash_remove(m->tasks, task->stream_id);
-    h2_ihash_remove(m->redo_tasks, task->stream_id);
-    h2_task_destroy(task);
-
     if (slave) {
         if (reuse_slave && slave->keepalive == AP_CONN_KEEPALIVE) {
+            h2_beam_log(task->output.beam, m->c, APLOG_DEBUG, 
+                        APLOGNO(03385) "h2_task_destroy, reuse slave");    
+            h2_task_destroy(task);
             APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave;
         }
         else {
+            h2_beam_log(task->output.beam, m->c, APLOG_TRACE1, 
+                        "h2_task_destroy, destroy slave");    
             slave->sbh = NULL;
             h2_slave_destroy(slave);
         }
     }
-    
-    check_tx_free(m);
 }
 
-static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error) 
-{
-    h2_task *task;
+static int stream_destroy_iter(void *ctx, void *val) 
+{   
+    h2_mplx *m = ctx;
+    h2_stream *stream = val;
+
+    h2_ihash_remove(m->spurge, stream->id);
+    ap_assert(stream->state == H2_SS_CLEANUP);
     
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
-                  "h2_stream(%ld-%d): done", m->c->id, stream->id);
-    /* Situation: we are, on the master connection, done with processing
-     * the stream. Either we have handled it successfully, or the stream
-     * was reset by the client or the connection is gone and we are 
-     * shutting down the whole session.
-     *
-     * We possibly have created a task for this stream to be processed
-     * on a slave connection. The processing might actually be ongoing
-     * right now or has already finished. A finished task waits for its
-     * stream to be done. This is the common case.
-     * 
-     * If the stream had input (e.g. the request had a body), a task
-     * may have read, or is still reading buckets from the input beam.
-     * This means that the task is referencing memory from the stream's
-     * pool (or the master connection bucket alloc). Before we can free
-     * the stream pool, we need to make sure that those references are
-     * gone. This is what h2_beam_shutdown() on the input waits for.
-     *
-     * With the input handled, we can tear down that beam and care
-     * about the output beam. The stream might still have buffered some
-     * buckets read from the output, so we need to get rid of those. That
-     * is done by h2_stream_cleanup().
-     *
-     * Now it is save to destroy the task (if it exists and is finished).
-     * 
-     * FIXME: we currently destroy the stream, even if the task is still
-     * ongoing. This is not ok, since task->request is coming from stream
-     * memory. We should either copy it on task creation or wait with the
-     * stream destruction until the task is done. 
-     */
-    h2_iq_remove(m->q, stream->id);
-    h2_ihash_remove(m->streams, stream->id);
-    h2_ihash_remove(m->shold, stream->id);
+    if (stream->input == NULL || stream->output == NULL) {
+        ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, m->c, 
+                      H2_STRM_MSG(stream, "already with beams==NULL"));
+        return 0;
+    }
+    /* Process outstanding events before destruction */
+    input_consumed_signal(m, stream);
     
-    h2_stream_cleanup(stream);
-    m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
-    h2_beam_on_consumed(stream->input, NULL, NULL, NULL);
-    /* Let anyone blocked reading know that there is no more to come */
-    h2_beam_abort(stream->input);
-    /* Remove mutex after, so that abort still finds cond to signal */
-    h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
-    m->tx_handles_reserved += h2_beam_get_files_beamed(stream->output);
-
-    task = h2_ihash_get(m->tasks, stream->id);
-    if (task) {
-        if (!task->worker_done) {
-            /* task still running, cleanup once it is done */
-            if (rst_error) {
-                h2_task_rst(task, rst_error);
-            }
-            h2_ihash_add(m->shold, stream);
-            return;
-        }
-        else {
-            /* already finished */
-            task_destroy(m, task, 1);
-        }
+    h2_beam_log(stream->input, m->c, APLOG_TRACE2, "stream_destroy");
+    h2_beam_log(stream->output, m->c, APLOG_TRACE2, "stream_destroy");
+    h2_beam_destroy(stream->input);
+    stream->input = NULL;
+    h2_beam_destroy(stream->output);
+    stream->output = NULL;
+    if (stream->task) {
+        task_destroy(m, stream->task);
+        stream->task = NULL;
     }
-    h2_ihash_add(m->spurge, stream);
+    h2_stream_destroy(stream);
+    return 0;
 }
 
-static int stream_done_iter(void *ctx, void *val)
+static void purge_streams(h2_mplx *m)
 {
-    stream_done((h2_mplx*)ctx, val, 0);
-    return 0;
+    if (!h2_ihash_empty(m->spurge)) {
+        while (!h2_ihash_iter(m->spurge, stream_destroy_iter, m)) {
+            /* repeat until empty */
+        }
+        check_tx_free(m);
+    }
 }
 
 typedef struct {
@@ -503,157 +415,126 @@ apr_status_t h2_mplx_stream_do(h2_mplx *
     return status;
 }
 
-static int task_print(void *ctx, void *val)
-{
+static int report_stream_iter(void *ctx, void *val) {
     h2_mplx *m = ctx;
-    h2_task *task = val;
-
+    h2_stream *stream = val;
+    h2_task *task = stream->task;
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                  H2_STRM_MSG(stream, "started=%d, scheduled=%d, ready=%d"), 
+                  !!stream->task, stream->scheduled, h2_stream_is_ready(stream));
     if (task) {
-        h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
-
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
-                      "->03198: h2_stream(%s): %s %s %s"
-                      "[orph=%d/started=%d/done=%d/frozen=%d]", 
-                      task->id, task->request->method, 
-                      task->request->authority, task->request->path,
-                      (stream? 0 : 1), task->worker_started, 
+                      H2_STRM_MSG(stream, "->03198: %s %s %s"
+                      "[started=%d/done=%d/frozen=%d]"), 
+                      task->request->method, task->request->authority, 
+                      task->request->path, task->worker_started, 
                       task->worker_done, task->frozen);
     }
     else {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
-                      "->03198: h2_stream(%ld-NULL): NULL", m->id);
+                      H2_STRM_MSG(stream, "->03198: no task"));
     }
     return 1;
 }
 
-static int task_abort_connection(void *ctx, void *val)
-{
-    h2_task *task = val;
-    if (!task->worker_done) { 
-        if (task->c) {
-            task->c->aborted = 1;
-        }
-        h2_beam_abort(task->input.beam);
-        h2_beam_abort(task->output.beam);
-    }
+static int unexpected_stream_iter(void *ctx, void *val) {
+    h2_mplx *m = ctx;
+    h2_stream *stream = val;
+    ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+                  H2_STRM_MSG(stream, "unexpected, started=%d, scheduled=%d, ready=%d"), 
+                  !!stream->task, stream->scheduled, h2_stream_is_ready(stream));
     return 1;
 }
 
-static int report_stream_iter(void *ctx, void *val) {
+static int stream_cancel_iter(void *ctx, void *val) {
     h2_mplx *m = ctx;
     h2_stream *stream = val;
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                  "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, ready=%d", 
-                  m->id, stream->id, stream->started, stream->scheduled,
-                  h2_stream_is_ready(stream));
-    return 1;
-}
 
-static int task_done_iter(void *ctx, void *val);
+    /* take over event monitoring */
+    h2_stream_set_monitor(stream, NULL);
+    /* Reset, should transit to CLOSED state */
+    h2_stream_rst(stream, H2_ERR_NO_ERROR);
+    /* All connection data has been sent, simulate cleanup */
+    h2_stream_dispatch(stream, H2_SEV_EOS_SENT);
+    stream_cleanup(m, stream);  
+    return 0;
+}
 
-apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
+void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
 {
     apr_status_t status;
+    int i, wait_secs = 60;
     int acquired;
 
     /* How to shut down a h2 connection:
-     * 1. tell the workers that no more tasks will come from us */
+     * 0. tell the workers that no more tasks will come from us */
     h2_workers_unregister(m->workers, m);
     
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        int i, wait_secs = 60;
+    enter_mutex(m, &acquired);
 
-        /* 2. disable WINDOW_UPDATEs and set the mplx to aborted, clear
-         *    our TODO list and purge any streams we have collected */
-        h2_mplx_set_consumed_cb(m, NULL, NULL);
-        h2_mplx_abort(m);
-        h2_iq_clear(m->q);
-        h2_ihash_clear(m->spurge);
-
-        /* 3. wakeup all sleeping tasks. Mark all still active streams as 'done'. 
-         *    m->streams has to be empty afterwards with streams either in
-         *    a) m->shold because a task is still active
-         *    b) m->spurge because task is done, or was not started */
-        h2_ihash_iter(m->tasks, task_abort_connection, m);
-        apr_thread_cond_broadcast(m->task_thawed);
-        while (!h2_ihash_iter(m->streams, stream_done_iter, m)) {
-            /* iterate until all streams have been removed */
-        }
-        ap_assert(h2_ihash_empty(m->streams));
-
-        /* 4. while workers are busy on this connection, meaning they
-         *    are processing tasks from this connection, wait on them finishing
-         *    to wake us and check again. Eventually, this has to succeed. */    
-        m->join_wait = wait;
-        for (i = 0; m->workers_busy > 0; ++i) {
-            status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
-            
-            if (APR_STATUS_IS_TIMEUP(status)) {
-                /* This can happen if we have very long running requests
-                 * that do not time out on IO. */
-                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03198)
-                              "h2_mplx(%ld): release, waiting for %d seconds now for "
-                              "%d h2_workers to return, have still %d tasks outstanding", 
-                              m->id, i*wait_secs, m->workers_busy,
-                              (int)h2_ihash_count(m->tasks));
-                h2_ihash_iter(m->shold, report_stream_iter, m);
-                h2_ihash_iter(m->tasks, task_print, m);
-            }
-        }
-        m->join_wait = NULL;
-        
-        /* 5. All workers for this connection are done, we are in 
-         * single-threaded processing now effectively. */
-        leave_mutex(m, acquired);
-
-        if (!h2_ihash_empty(m->tasks)) {
-            /* when we are here, we lost track of the tasks still present.
-             * this currently happens with mod_proxy_http2 when we shut
-             * down a h2_req_engine with tasks assigned. Since no parallel
-             * processing is going on any more, we just clean them up. */ 
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,  APLOGNO(03056)
-                          "h2_mplx(%ld): 3. release_join with %d tasks",
-                          m->id, (int)h2_ihash_count(m->tasks));
-            h2_ihash_iter(m->tasks, task_print, m);
-            
-            while (!h2_ihash_iter(m->tasks, task_done_iter, m)) {
-                /* iterate until all tasks have been removed */
-            }
-        }
-
-        /* 6. With all tasks done, the stream hold should be empty now. */
-        ap_assert(h2_ihash_empty(m->shold));
-        
-        /* 7. close the h2_req_enginge shed and self destruct */
-        h2_ngn_shed_destroy(m->ngn_shed);
-        m->ngn_shed = NULL;
-        h2_mplx_destroy(m);
+    /* How to shut down a h2 connection:
+     * 1. set aborted flag and cancel all streams still active */
+    m->aborted = 1;
+    while (!h2_ihash_iter(m->streams, stream_cancel_iter, m)) {
+        /* until empty */
+    }
+    
+    /* 2. terminate ngn_shed, no more streams
+     * should be scheduled or in the active set */
+    h2_ngn_shed_abort(m->ngn_shed);
+    ap_assert(h2_ihash_empty(m->streams));
+    ap_assert(h2_iq_empty(m->q));
+    
+    /* 3. while workers are busy on this connection, meaning they
+     *    are processing tasks from this connection, wait on them finishing
+     *    in order to wake us and let us check again. 
+     *    Eventually, this has to succeed. */    
+    m->join_wait = wait;
+    for (i = 0; h2_ihash_count(m->shold) > 0; ++i) {        
+        status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
+        
+        if (APR_STATUS_IS_TIMEUP(status)) {
+            /* This can happen if we have very long running requests
+             * that do not time out on IO. */
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03198)
+                          "h2_mplx(%ld): waited %d sec for %d tasks", 
+                          m->id, i*wait_secs, (int)h2_ihash_count(m->shold));
+            h2_ihash_iter(m->shold, report_stream_iter, m);
+        }
+    }
+    m->join_wait = NULL;
+    
+    /* 4. close the h2_req_enginge shed */
+    h2_ngn_shed_destroy(m->ngn_shed);
+    m->ngn_shed = NULL;
+    
+    /* 4. With all workers done, all streams should be in spurge */
+    if (!h2_ihash_empty(m->shold)) {
+        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03516)
+                      "h2_mplx(%ld): unexpected %d streams in hold", 
+                      m->id, (int)h2_ihash_count(m->shold));
+        h2_ihash_iter(m->shold, unexpected_stream_iter, m);
     }
-    return status;
-}
-
-void h2_mplx_abort(h2_mplx *m)
-{
-    int acquired;
+    /*ap_assert(h2_ihash_empty(m->shold));*/
     
-    if (!m->aborted && enter_mutex(m, &acquired) == APR_SUCCESS) {
-        m->aborted = 1;
-        h2_ngn_shed_abort(m->ngn_shed);
-        leave_mutex(m, acquired);
-    }
+    /* 5. return any file resources allocated */
+    check_tx_free(m);
+    
+    leave_mutex(m, acquired);
+
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                  "h2_mplx(%ld): released", m->id);
 }
 
-apr_status_t h2_mplx_stream_done(h2_mplx *m, h2_stream *stream)
+apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, h2_stream *stream)
 {
     apr_status_t status = APR_SUCCESS;
     int acquired;
     
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
-                      "h2_mplx(%ld-%d): marking stream as done.", 
-                      m->id, stream->id);
-        stream_done(m, stream, stream->rst_error);
-        purge_streams(m);
+                      H2_STRM_MSG(stream, "cleanup"));
+        stream_cleanup(m, stream);        
         leave_mutex(m, acquired);
     }
     return status;
@@ -687,7 +568,9 @@ static void output_produced(void *ctx, h
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         stream = h2_ihash_get(m->streams, beam->id);
         if (stream) {
-            have_out_data_for(m, stream, 0);
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
+                          "h2_mplx(%s): output_produced", stream->task->id);
+            have_out_data_for(m, stream);
         }
         leave_mutex(m, acquired);
     }
@@ -696,23 +579,22 @@ static void output_produced(void *ctx, h
 static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
 {
     apr_status_t status = APR_SUCCESS;
-    h2_task *task = h2_ihash_get(m->tasks, stream_id);
     h2_stream *stream = h2_ihash_get(m->streams, stream_id);
     apr_size_t beamed_count;
     
-    if (!task || !stream) {
+    if (!stream || !stream->task) {
         return APR_ECONNABORTED;
     }
     
     if (APLOGctrace2(m->c)) {
-        h2_beam_log(beam, stream_id, "out_open", m->c, APLOG_TRACE2);
+        h2_beam_log(beam, m->c, APLOG_TRACE2, "out_open");
     }
     else {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
-                      "h2_mplx(%s): out open", task->id);
+                      "h2_mplx(%s): out open", stream->task->id);
     }
     
-    h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, task);
+    h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, stream->task);
     h2_beam_on_produced(stream->output, output_produced, m);
     beamed_count = h2_beam_get_files_beamed(stream->output);
     if (m->tx_handles_reserved >= beamed_count) {
@@ -721,17 +603,17 @@ static apr_status_t out_open(h2_mplx *m,
     else {
         m->tx_handles_reserved = 0;
     }
-    if (!task->output.copy_files) {
+    if (!stream->task->output.copy_files) {
         h2_beam_on_file_beam(stream->output, can_beam_file, m);
     }
     
     /* time to protect the beam against multi-threaded use */
-    h2_beam_mutex_set(stream->output, beam_enter, task->cond, m);
+    h2_beam_mutex_set(stream->output, beam_enter, stream->task->cond, m);
     
     /* we might see some file buckets in the output, see
      * if we have enough handles reserved. */
     check_tx_reservation(m);
-    have_out_data_for(m, stream, 0);
+    have_out_data_for(m, stream);
     return status;
 }
 
@@ -769,10 +651,9 @@ static apr_status_t out_close(h2_mplx *m
     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
                   "h2_mplx(%s): close", task->id);
     status = h2_beam_close(task->output.beam);
-    h2_beam_log(task->output.beam, task->stream_id, "out_close", m->c, 
-                APLOG_TRACE2);
+    h2_beam_log(task->output.beam, m->c, APLOG_TRACE2, "out_close");
     output_consumed_signal(m, task);
-    have_out_data_for(m, stream, 0);
+    have_out_data_for(m, stream);
     return status;
 }
 
@@ -805,7 +686,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx
     return status;
 }
 
-static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response)
+static void have_out_data_for(h2_mplx *m, h2_stream *stream)
 {
     ap_assert(m);
     ap_assert(stream);
@@ -849,8 +730,11 @@ apr_status_t h2_mplx_process(h2_mplx *m,
         else {
             h2_ihash_add(m->streams, stream);
             if (h2_stream_is_ready(stream)) {
+                /* already have a response */
                 apr_atomic_set32(&m->event_pending, 1);
                 h2_iq_append(m->readyq, stream->id);
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+                              H2_STRM_MSG(stream, "process, add to readyq")); 
             }
             else {
                 if (!m->need_registration) {
@@ -860,9 +744,9 @@ apr_status_t h2_mplx_process(h2_mplx *m,
                     do_registration = m->need_registration;
                 }
                 h2_iq_add(m->q, stream->id, cmp, ctx);                
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+                              H2_STRM_MSG(stream, "process, add to q")); 
             }
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
-                          "h2_mplx(%ld-%d): process", m->c->id, stream->id);
         }
         leave_mutex(m, acquired);
     }
@@ -875,10 +759,9 @@ apr_status_t h2_mplx_process(h2_mplx *m,
 
 static h2_task *next_stream_task(h2_mplx *m)
 {
-    h2_task *task = NULL;
     h2_stream *stream;
     int sid;
-    while (!m->aborted && !task  && (m->workers_busy < m->workers_limit)
+    while (!m->aborted && (m->workers_busy < m->workers_limit)
            && (sid = h2_iq_shift(m->q)) > 0) {
         
         stream = h2_ihash_get(m->streams, sid);
@@ -897,34 +780,34 @@ static h2_task *next_stream_task(h2_mplx
             
             slave->sbh = m->c->sbh;
             slave->aborted = 0;
-            task = h2_task_create(slave, stream->id, stream->request, 
-                                  stream->input, stream->output, m);
-            h2_ihash_add(m->tasks, task);
-            
-            m->c->keepalives++;
-            apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id);
-            if (new_conn) {
-                h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave));
-            }
-            stream->started = 1;
-            task->worker_started = 1;
-            task->started_at = apr_time_now();
-            if (sid > m->max_stream_started) {
-                m->max_stream_started = sid;
+            if (!stream->task) {
+                stream->task = h2_task_create(stream, slave);
+                
+                m->c->keepalives++;
+                apr_table_setn(slave->notes, H2_TASK_ID_NOTE, stream->task->id);
+                if (new_conn) {
+                    h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave));
+                }
+                if (sid > m->max_stream_started) {
+                    m->max_stream_started = sid;
+                }
+                
+                h2_beam_timeout_set(stream->input, m->stream_timeout);
+                h2_beam_on_consumed(stream->input, stream_input_ev, 
+                                    stream_input_consumed, m);
+                h2_beam_on_file_beam(stream->input, can_beam_file, m);
+                h2_beam_mutex_set(stream->input, beam_enter, stream->task->cond, m);
+                
+                h2_beam_buffer_size_set(stream->output, m->stream_max_mem);
+                h2_beam_timeout_set(stream->output, m->stream_timeout);
             }
-
-            h2_beam_timeout_set(stream->input, m->stream_timeout);
-            h2_beam_on_consumed(stream->input, stream_input_ev, 
-                                stream_input_consumed, m);
-            h2_beam_on_file_beam(stream->input, can_beam_file, m);
-            h2_beam_mutex_set(stream->input, beam_enter, task->cond, m);
-            
-            h2_beam_buffer_size_set(stream->output, m->stream_max_mem);
-            h2_beam_timeout_set(stream->output, m->stream_timeout);
+            stream->task->worker_started = 1;
+            stream->task->started_at = apr_time_now();
             ++m->workers_busy;
+            return stream->task;
         }
     }
-    return task;
+    return NULL;
 }
 
 h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
@@ -952,6 +835,8 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, in
 
 static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
 {
+    h2_stream *stream;
+    
     if (task->frozen) {
         /* this task was handed over to an engine for processing 
          * and the original worker has finished. That means the 
@@ -960,109 +845,97 @@ static void task_done(h2_mplx *m, h2_tas
         apr_thread_cond_broadcast(m->task_thawed);
         return;
     }
-    else {
-        h2_stream *stream;
-        
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                      "h2_mplx(%ld): task(%s) done", m->id, task->id);
-        out_close(m, task);
-        
-        if (ngn) {
-            apr_off_t bytes = 0;
-            h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
-            bytes += h2_beam_get_buffered(task->output.beam);
-            if (bytes > 0) {
-                /* we need to report consumed and current buffered output
-                 * to the engine. The request will be streamed out or cancelled,
-                 * no more data is coming from it and the engine should update
-                 * its calculations before we destroy this information. */
-                h2_req_engine_out_consumed(ngn, task->c, bytes);
-            }
-        }
         
-        if (task->engine) {
-            if (!m->aborted && !task->c->aborted 
-                && !h2_req_engine_is_shutdown(task->engine)) {
-                ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
-                              "h2_mplx(%ld): task(%s) has not-shutdown "
-                              "engine(%s)", m->id, task->id, 
-                              h2_req_engine_get_id(task->engine));
-            }
-            h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                  "h2_mplx(%ld): task(%s) done", m->id, task->id);
+    out_close(m, task);
+    
+    if (ngn) {
+        apr_off_t bytes = 0;
+        h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
+        bytes += h2_beam_get_buffered(task->output.beam);
+        if (bytes > 0) {
+            /* we need to report consumed and current buffered output
+             * to the engine. The request will be streamed out or cancelled,
+             * no more data is coming from it and the engine should update
+             * its calculations before we destroy this information. */
+            h2_req_engine_out_consumed(ngn, task->c, bytes);
         }
-        
-        stream = h2_ihash_get(m->streams, task->stream_id);
-        if (!m->aborted && stream 
-            && h2_ihash_get(m->redo_tasks, task->stream_id)) {
-            /* reset and schedule again */
-            h2_task_redo(task);
-            h2_ihash_remove(m->redo_tasks, task->stream_id);
-            h2_iq_add(m->q, task->stream_id, NULL, NULL);
-            return;
+    }
+    
+    if (task->engine) {
+        if (!m->aborted && !task->c->aborted 
+            && !h2_req_engine_is_shutdown(task->engine)) {
+            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+                          "h2_mplx(%ld): task(%s) has not-shutdown "
+                          "engine(%s)", m->id, task->id, 
+                          h2_req_engine_get_id(task->engine));
+        }
+        h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
+    }
+    
+    task->worker_done = 1;
+    task->done_at = apr_time_now();
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                  "h2_mplx(%s): request done, %f ms elapsed", task->id, 
+                  (task->done_at - task->started_at) / 1000.0);
+    
+    if (task->started_at > m->last_idle_block) {
+        /* this task finished without causing an 'idle block', e.g.
+         * a block by flow control.
+         */
+        if (task->done_at- m->last_limit_change >= m->limit_change_interval
+            && m->workers_limit < m->workers_max) {
+            /* Well behaving stream, allow it more workers */
+            m->workers_limit = H2MIN(m->workers_limit * 2, 
+                                     m->workers_max);
+            m->last_limit_change = task->done_at;
+            m->need_registration = 1;
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                          "h2_mplx(%ld): increase worker limit to %d",
+                          m->id, m->workers_limit);
         }
-        
-        task->worker_done = 1;
-        task->done_at = apr_time_now();
+    }
+    
+    stream = h2_ihash_get(m->streams, task->stream_id);
+    if (stream && !m->aborted && h2_ihash_get(m->sredo, stream->id)) {
+        /* reset and schedule again */
+        h2_task_redo(task);
+        h2_ihash_remove(m->sredo, stream->id);
+        h2_iq_add(m->q, stream->id, NULL, NULL);
+        return;
+    }
+    
+    if (stream) {
+        /* stream not cleaned up, stay around */
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                      "h2_mplx(%s): request done, %f ms elapsed", task->id, 
-                      (task->done_at - task->started_at) / 1000.0);
-        if (task->started_at > m->last_idle_block) {
-            /* this task finished without causing an 'idle block', e.g.
-             * a block by flow control.
-             */
-            if (task->done_at- m->last_limit_change >= m->limit_change_interval
-                && m->workers_limit < m->workers_max) {
-                /* Well behaving stream, allow it more workers */
-                m->workers_limit = H2MIN(m->workers_limit * 2, 
-                                         m->workers_max);
-                m->last_limit_change = task->done_at;
-                m->need_registration = 1;
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                              "h2_mplx(%ld): increase worker limit to %d",
-                              m->id, m->workers_limit);
-            }
-        }
-        
-        if (stream) {
-            /* hang around until the stream deregisters */
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                          "h2_mplx(%s): task_done, stream still open", 
-                          task->id);
-            /* more data will not arrive, resume the stream */
-            have_out_data_for(m, stream, 0);
-            h2_beam_on_consumed(stream->output, NULL, NULL, NULL);
-            h2_beam_mutex_set(stream->output, NULL, NULL, NULL);
-        }
-        else {
-            /* stream no longer active, was it placed in hold? */
-            stream = h2_ihash_get(m->shold, task->stream_id);
-            if (stream) {
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                              "h2_mplx(%s): task_done, stream %d in hold", 
-                              task->id, stream->id);
-                /* We cannot destroy the stream here since this is 
-                 * called from a worker thread and freeing memory pools
-                 * is only safe in the only thread using it (and its
-                 * parent pool / allocator) */
-                h2_beam_on_consumed(stream->output, NULL, NULL, NULL);
-                h2_beam_mutex_set(stream->output, NULL, NULL, NULL);
-                h2_ihash_remove(m->shold, stream->id);
-                h2_ihash_add(m->spurge, stream);
-            }
-            else {
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                              "h2_mplx(%s): task_done, stream not found", 
-                              task->id);
-                task_destroy(m, task, 0);
-            }
-        }
+                      H2_STRM_MSG(stream, "task_done, stream open")); 
+        /* more data will not arrive, resume the stream */
+        h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
+        h2_beam_mutex_set(stream->output, NULL, NULL, NULL);
+        h2_beam_leave(stream->input);
+        have_out_data_for(m, stream);
+    }
+    else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                      H2_STRM_MSG(stream, "task_done, in hold"));
+        /* stream was just waiting for us. */
+        h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
+        h2_beam_mutex_set(stream->output, NULL, NULL, NULL);
+        h2_beam_leave(stream->input);
+        stream_joined(m, stream);
+    }
+    else if ((stream = h2_ihash_get(m->spurge, task->stream_id)) != NULL) {
+        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,   
+                      H2_STRM_LOG(APLOGNO(03517), stream, "already in spurge"));
+        ap_assert("stream should not be in spurge" == NULL);
+    }
+    else {
+        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03518)
+                      "h2_mplx(%s): task_done, stream not found", 
+                      task->id);
+        ap_assert("stream should still be available" == NULL);
     }
-}
-
-static int task_done_iter(void *ctx, void *val)
-{
-    task_done((h2_mplx*)ctx, val, 0);
-    return 0;
 }
 
 void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
@@ -1089,75 +962,76 @@ void h2_mplx_task_done(h2_mplx *m, h2_ta
 
 static int latest_repeatable_unsubmitted_iter(void *data, void *val)
 {
-    task_iter_ctx *ctx = data;
-    h2_stream *stream;
-    h2_task *task = val;
-    if (!task->worker_done && h2_task_can_redo(task) 
-        && !h2_ihash_get(ctx->m->redo_tasks, task->stream_id)) {
-        stream = h2_ihash_get(ctx->m->streams, task->stream_id);
-        if (stream && !h2_stream_is_ready(stream)) {
+    stream_iter_ctx *ctx = data;
+    h2_stream *stream = val;
+    
+    if (stream->task && !stream->task->worker_done 
+        && h2_task_can_redo(stream->task) 
+        && !h2_ihash_get(ctx->m->sredo, stream->id)) {
+        if (!h2_stream_is_ready(stream)) {
             /* this task occupies a worker, the response has not been submitted 
              * yet, not been cancelled and it is a repeatable request
              * -> it can be re-scheduled later */
-            if (!ctx->task || ctx->task->started_at < task->started_at) {
+            if (!ctx->stream 
+                || (ctx->stream->task->started_at < stream->task->started_at)) {
                 /* we did not have one or this one was started later */
-                ctx->task = task;
+                ctx->stream = stream;
             }
         }
     }
     return 1;
 }
 
-static h2_task *get_latest_repeatable_unsubmitted_task(h2_mplx *m) 
+static h2_stream *get_latest_repeatable_unsubmitted_stream(h2_mplx *m) 
 {
-    task_iter_ctx ctx;
+    stream_iter_ctx ctx;
     ctx.m = m;
-    ctx.task = NULL;
-    h2_ihash_iter(m->tasks, latest_repeatable_unsubmitted_iter, &ctx);
-    return ctx.task;
+    ctx.stream = NULL;
+    h2_ihash_iter(m->streams, latest_repeatable_unsubmitted_iter, &ctx);
+    return ctx.stream;
 }
 
 static int timed_out_busy_iter(void *data, void *val)
 {
-    task_iter_ctx *ctx = data;
-    h2_task *task = val;
-    if (!task->worker_done
-        && (ctx->now - task->started_at) > ctx->m->stream_timeout) {
+    stream_iter_ctx *ctx = data;
+    h2_stream *stream = val;
+    if (stream->task && !stream->task->worker_done
+        && (ctx->now - stream->task->started_at) > ctx->m->stream_timeout) {
         /* timed out stream occupying a worker, found */
-        ctx->task = task;
+        ctx->stream = stream;
         return 0;
     }
     return 1;
 }
 
-static h2_task *get_timed_out_busy_task(h2_mplx *m) 
+static h2_stream *get_timed_out_busy_stream(h2_mplx *m) 
 {
-    task_iter_ctx ctx;
+    stream_iter_ctx ctx;
     ctx.m = m;
-    ctx.task = NULL;
+    ctx.stream = NULL;
     ctx.now = apr_time_now();
-    h2_ihash_iter(m->tasks, timed_out_busy_iter, &ctx);
-    return ctx.task;
+    h2_ihash_iter(m->streams, timed_out_busy_iter, &ctx);
+    return ctx.stream;
 }
 
 static apr_status_t unschedule_slow_tasks(h2_mplx *m) 
 {
-    h2_task *task;
+    h2_stream *stream;
     int n;
     
     /* Try to get rid of streams that occupy workers. Look for safe requests
      * that are repeatable. If none found, fail the connection.
      */
-    n = (m->workers_busy - m->workers_limit - (int)h2_ihash_count(m->redo_tasks));
-    while (n > 0 && (task = get_latest_repeatable_unsubmitted_task(m))) {
-        h2_task_rst(task, H2_ERR_CANCEL);
-        h2_ihash_add(m->redo_tasks, task);
+    n = (m->workers_busy - m->workers_limit - (int)h2_ihash_count(m->sredo));
+    while (n > 0 && (stream = get_latest_repeatable_unsubmitted_stream(m))) {
+        h2_task_rst(stream->task, H2_ERR_CANCEL);
+        h2_ihash_add(m->sredo, stream);
         --n;
     }
     
-    if ((m->workers_busy - h2_ihash_count(m->redo_tasks)) > m->workers_limit) {
-        task = get_timed_out_busy_task(m);
-        if (task) {
+    if ((m->workers_busy - h2_ihash_count(m->sredo)) > m->workers_limit) {
+        h2_stream *stream = get_timed_out_busy_stream(m);
+        if (stream) {
             /* Too many busy workers, unable to cancel enough streams
              * and with a busy, timed out stream, we tell the client
              * to go away... */
@@ -1232,9 +1106,9 @@ typedef struct {
 static int ngn_update_window(void *ctx, void *val)
 {
     ngn_update_ctx *uctx = ctx;
-    h2_task *task = val;
-    if (task && task->assigned == uctx->ngn
-        && output_consumed_signal(uctx->m, task)) {
+    h2_stream *stream = val;
+    if (stream->task && stream->task->assigned == uctx->ngn
+        && output_consumed_signal(uctx->m, stream->task)) {
         ++uctx->streams_updated;
     }
     return 1;
@@ -1247,7 +1121,7 @@ static apr_status_t ngn_out_update_windo
     ctx.m = m;
     ctx.ngn = ngn;
     ctx.streams_updated = 0;
-    h2_ihash_iter(m->tasks, ngn_update_window, &ctx);
+    h2_ihash_iter(m->streams, ngn_update_window, &ctx);
     
     return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN;
 }
@@ -1331,11 +1205,15 @@ void h2_mplx_req_engine_done(h2_req_engi
         int acquired;
 
         if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+            h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
+            
             ngn_out_update_windows(m, ngn);
             h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
-            if (status != APR_SUCCESS && h2_task_can_redo(task) 
-                && !h2_ihash_get(m->redo_tasks, task->stream_id)) {
-                h2_ihash_add(m->redo_tasks, task);
+            
+            if (status != APR_SUCCESS && stream 
+                && h2_task_can_redo(task) 
+                && !h2_ihash_get(m->sredo, stream->id)) {
+                h2_ihash_add(m->sredo, stream);
             }
             if (task->engine) { 
                 /* cannot report that as done until engine returns */
@@ -1380,7 +1258,7 @@ apr_status_t h2_mplx_dispatch_master_eve
     }
     
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
                       "h2_mplx(%ld): dispatch events", m->id);        
         apr_atomic_set32(&m->event_pending, 0);
         /* update input windows for streams */
@@ -1429,7 +1307,7 @@ int h2_mplx_awaits_data(h2_mplx *m)
         if (h2_ihash_empty(m->streams)) {
             waiting = 0;
         }
-        if (h2_iq_empty(m->q) && h2_ihash_empty(m->tasks)) {
+        if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->workers_busy) {
             waiting = 0;
         }
         leave_mutex(m, acquired);

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h?rev=1782980&r1=1782979&r2=1782980&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h Tue Feb 14 15:53:50 2017
@@ -72,13 +72,13 @@ struct h2_mplx {
     unsigned int need_registration : 1;
 
     struct h2_ihash_t *streams;     /* all streams currently processing */
+    struct h2_ihash_t *sredo;       /* all streams that need to be re-started */
     struct h2_ihash_t *shold;       /* all streams done with task ongoing */
     struct h2_ihash_t *spurge;      /* all streams done, ready for destroy */
-
+    
     struct h2_iqueue *q;            /* all stream ids that need to be started */
     struct h2_iqueue *readyq;       /* all stream ids ready for output */
         
-    struct h2_ihash_t *tasks;       /* all tasks started and not destroyed */
     struct h2_ihash_t *redo_tasks;  /* all tasks that need to be redone */
     
     int max_streams;        /* max # of concurrent streams */
@@ -137,13 +137,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
  * @param m the mplx to be released and destroyed
  * @param wait condition var to wait on for ref counter == 0
  */ 
-apr_status_t h2_mplx_release_and_join(h2_mplx *m, struct apr_thread_cond_t *wait);
-
-/**
- * Aborts the multiplexer. It will answer all future invocation with
- * APR_ECONNABORTED, leading to early termination of ongoing streams.
- */
-void h2_mplx_abort(h2_mplx *mplx);
+void h2_mplx_release_and_join(h2_mplx *m, struct apr_thread_cond_t *wait);
 
 struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, int *has_more);
 
@@ -165,14 +159,13 @@ int h2_mplx_is_busy(h2_mplx *m);
 struct h2_stream *h2_mplx_stream_get(h2_mplx *m, int id);
 
 /**
- * Notifies mplx that a stream has finished processing.
+ * Notifies mplx that a stream has been completely handled on the main
+ * connection and is ready for cleanup.
  * 
  * @param m the mplx itself
- * @param stream the id of the stream being done
- * @param rst_error if != 0, the stream was reset with the error given
- *
+ * @param stream the stream ready for cleanup
  */
-apr_status_t h2_mplx_stream_done(h2_mplx *m, struct h2_stream *stream);
+apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, struct h2_stream *stream);
 
 /**
  * Waits on output data from any stream in this session to become available. 

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_proxy_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_proxy_session.c?rev=1782980&r1=1782979&r2=1782980&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_proxy_session.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_proxy_session.c Tue Feb 14 15:53:50 2017
@@ -40,7 +40,7 @@ typedef struct h2_proxy_stream {
     const char *p_server_uri;
     int standalone;
 
-    h2_stream_state_t state;
+    h2_proxy_stream_state_t state;
     unsigned int suspended : 1;
     unsigned int waiting_on_100 : 1;
     unsigned int waiting_on_ping : 1;
@@ -1470,7 +1470,7 @@ void h2_proxy_session_cleanup(h2_proxy_s
         cleanup_iter_ctx ctx;
         ctx.session = session;
         ctx.done = done;
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03366)
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03519)
                       "h2_proxy_session(%s): terminated, %d streams unfinished",
                       session->id, (int)h2_proxy_ihash_count(session->streams));
         h2_proxy_ihash_iter(session->streams, done_iter, &ctx);

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_proxy_session.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_proxy_session.h?rev=1782980&r1=1782979&r2=1782980&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_proxy_session.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_proxy_session.h Tue Feb 14 15:53:50 2017
@@ -24,6 +24,16 @@ struct h2_proxy_iqueue;
 struct h2_proxy_ihash_t;
 
 typedef enum {
+    H2_STREAM_ST_IDLE,
+    H2_STREAM_ST_OPEN,
+    H2_STREAM_ST_RESV_LOCAL,
+    H2_STREAM_ST_RESV_REMOTE,
+    H2_STREAM_ST_CLOSED_INPUT,
+    H2_STREAM_ST_CLOSED_OUTPUT,
+    H2_STREAM_ST_CLOSED,
+} h2_proxy_stream_state_t;
+
+typedef enum {
     H2_PROXYS_ST_INIT,             /* send initial SETTINGS, etc. */
     H2_PROXYS_ST_DONE,             /* finished, connection close */
     H2_PROXYS_ST_IDLE,             /* no streams to process */

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_push.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_push.c?rev=1782980&r1=1782979&r2=1782980&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_push.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_push.c Tue Feb 14 15:53:50 2017
@@ -700,9 +700,8 @@ apr_array_header_t *h2_push_collect_upda
                                             cache_digest, stream->pool);
         if (status != APR_SUCCESS) {
             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
-                          APLOGNO(03057)
-                          "h2_session(%ld): push diary set from Cache-Digest: %s", 
-                          session->id, cache_digest);
+                          H2_SSSN_LOG(APLOGNO(03057), session,
+                          "push diary set from Cache-Digest: %s"), cache_digest);
         }
     }
     pushes = h2_push_collect(stream->pool, req, stream->push_policy, res);



Mime
View raw message