httpd-cvs mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ic...@apache.org
Subject svn commit: r1739303 [2/3] - in /httpd/httpd/trunk: ./ modules/http2/
Date Fri, 15 Apr 2016 13:50:46 GMT
Modified: httpd/httpd/trunk/modules/http2/h2_mplx.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.c?rev=1739303&r1=1739302&r2=1739303&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.c Fri Apr 15 13:50:46 2016
@@ -29,38 +29,40 @@
 #include "mod_http2.h"
 
 #include "h2_private.h"
+#include "h2_bucket_beam.h"
 #include "h2_config.h"
 #include "h2_conn.h"
 #include "h2_ctx.h"
 #include "h2_h2.h"
-#include "h2_int_queue.h"
 #include "h2_io.h"
-#include "h2_io_set.h"
 #include "h2_response.h"
 #include "h2_mplx.h"
 #include "h2_ngn_shed.h"
 #include "h2_request.h"
 #include "h2_stream.h"
 #include "h2_task.h"
-#include "h2_task_input.h"
-#include "h2_task_output.h"
 #include "h2_worker.h"
 #include "h2_workers.h"
 #include "h2_util.h"
 
 
-#define H2_MPLX_IO_OUT(lvl,m,io,msg) \
-    do { \
-        if (APLOG_C_IS_LEVEL((m)->c,lvl)) \
-        h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbout); \
-    } while(0)
-    
-#define H2_MPLX_IO_IN(lvl,m,io,msg) \
-    do { \
-        if (APLOG_C_IS_LEVEL((m)->c,lvl)) \
-        h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbin); \
-    } while(0)
+static void h2_beam_log(h2_bucket_beam *beam, int id, const char *msg, 
+                        conn_rec *c, int level)
+{
+    if (beam && APLOG_C_IS_LEVEL(c,level)) {
+        char buffer[2048];
+        apr_size_t off = 0;
+        
+        off += apr_snprintf(buffer+off, H2_ALEN(buffer)-off, "cl=%d, ", beam->closed);
+        off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "red", ", ", &beam->red);
+        off += h2_util_bb_print(buffer+off, H2_ALEN(buffer)-off, "green", ", ", beam->green);
+        off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "hold", ", ", &beam->hold);
+        off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "purge", "", &beam->purge);
 
+        ap_log_cerror(APLOG_MARK, level, 0, c, "beam(%ld-%d): %s %s", 
+                      c->id, id, msg, buffer);
+    }
+}
 
 /* NULL or the mutex hold by this thread, used for recursive calls
  */
@@ -104,13 +106,51 @@ static void leave_mutex(h2_mplx *m, int
     }
 }
 
-static int is_aborted(h2_mplx *m, apr_status_t *pstatus)
+static apr_status_t io_mutex_enter(void *ctx, 
+                                   apr_thread_mutex_t **plock, int *acquired)
 {
-    AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        *pstatus = APR_ECONNABORTED;
+    h2_mplx *m = ctx;
+    *plock = m->lock;
+    return enter_mutex(m, acquired);
+}
+
+static void io_mutex_leave(void *ctx, apr_thread_mutex_t *lock, int acquired)
+{
+    h2_mplx *m = ctx;
+    leave_mutex(m, acquired);
+}
+
+static void stream_output_consumed(void *ctx, 
+                                   h2_bucket_beam *beam, apr_off_t length)
+{
+    h2_io *io = ctx;
+    if (length > 0 && io->task && io->task->assigned) {
+        h2_req_engine_out_consumed(io->task->assigned, io->task->c, length); 
+    }
+}
+
+static void stream_input_consumed(void *ctx, 
+                                  h2_bucket_beam *beam, apr_off_t length)
+{
+    h2_mplx *m = ctx;
+    if (m->input_consumed && length) {
+        m->input_consumed(m->input_consumed_ctx, beam->id, length);
+    }
+}
+
+static int can_beam_file(void *ctx, h2_bucket_beam *beam,  apr_file_t *file)
+{
+    h2_mplx *m = ctx;
+    if (m->tx_handles_reserved > 0) {
+        --m->tx_handles_reserved;
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+                      "h2_mplx(%ld-%d): beaming file %s, tx_avail %d", 
+                      m->id, beam->id, beam->tag, m->tx_handles_reserved);
         return 1;
     }
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+                  "h2_mplx(%ld-%d): can_beam_file denied on %s", 
+                  m->id, beam->id, beam->tag);
     return 0;
 }
 
@@ -118,9 +158,9 @@ static void have_out_data_for(h2_mplx *m
 
 static void check_tx_reservation(h2_mplx *m) 
 {
-    if (m->tx_handles_reserved == 0) {
+    if (m->tx_handles_reserved <= 0) {
         m->tx_handles_reserved += h2_workers_tx_reserve(m->workers, 
-            H2MIN(m->tx_chunk_size, h2_io_set_size(m->stream_ios)));
+            H2MIN(m->tx_chunk_size, h2_ilist_count(m->stream_ios)));
     }
 }
 
@@ -132,7 +172,7 @@ static void check_tx_free(h2_mplx *m)
         h2_workers_tx_free(m->workers, count);
     }
     else if (m->tx_handles_reserved 
-             && (!m->stream_ios || h2_io_set_is_empty(m->stream_ios))) {
+             && (!m->stream_ios || h2_ilist_empty(m->stream_ios))) {
         h2_workers_tx_free(m->workers, m->tx_handles_reserved);
         m->tx_handles_reserved = 0;
     }
@@ -143,7 +183,7 @@ static void h2_mplx_destroy(h2_mplx *m)
     AP_DEBUG_ASSERT(m);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                   "h2_mplx(%ld): destroy, ios=%d", 
-                  m->id, (int)h2_io_set_size(m->stream_ios));
+                  m->id, (int)h2_ilist_count(m->stream_ios));
     check_tx_free(m);
     if (m->pool) {
         apr_pool_destroy(m->pool);
@@ -205,8 +245,8 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
         m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS);
         m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
         m->q = h2_iq_create(m->pool, m->max_streams);
-        m->stream_ios = h2_io_set_create(m->pool);
-        m->ready_ios = h2_io_set_create(m->pool);
+        m->stream_ios = h2_ilist_create(m->pool);
+        m->ready_ios = h2_ilist_create(m->pool);
         m->stream_timeout = stream_timeout;
         m->workers = workers;
         m->workers_max = workers->max_workers;
@@ -240,49 +280,29 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m
     return max_stream_started;
 }
 
-static void workers_register(h2_mplx *m)
+static void io_in_consumed_signal(h2_mplx *m, h2_io *io)
 {
-    /* h2_workers is only a hub for all the h2_worker instances.
-     * At the end-of-life of this h2_mplx, we always unregister at
-     * the workers. The thing to manage are all the h2_worker instances
-     * out there. Those may hold a reference to this h2_mplx and we cannot
-     * call them to unregister.
-     * 
-     * Therefore: ref counting for h2_workers in not needed, ref counting
-     * for h2_worker using this is critical.
-     */
-    m->need_registration = 0;
-    h2_workers_register(m->workers, m);
-}
-
-static int io_in_consumed_signal(h2_mplx *m, h2_io *io)
-{
-    if (io->input_consumed && m->input_consumed) {
-        m->input_consumed(m->input_consumed_ctx, 
-                          io->id, io->input_consumed);
-        io->input_consumed = 0;
-        return 1;
+    if (io->beam_in && io->worker_started) {
+        h2_beam_send(io->beam_in, NULL, 0); /* trigger updates */
     }
-    return 0;
 }
 
 static int io_out_consumed_signal(h2_mplx *m, h2_io *io)
 {
-    if (io->output_consumed && io->task && io->task->assigned) {
-        h2_req_engine_out_consumed(io->task->assigned, io->task->c, 
-                                   io->output_consumed);
-        io->output_consumed = 0;
-        return 1;
+    if (io->beam_out && io->worker_started && io->task && io->task->assigned) {
+        h2_beam_send(io->beam_out, NULL, 0); /* trigger updates */
     }
     return 0;
 }
 
+
 static void io_destroy(h2_mplx *m, h2_io *io, int events)
 {
+    conn_rec *slave = NULL;
     int reuse_slave;
     
     /* cleanup any buffered input */
-    h2_io_in_shutdown(io);
+    h2_io_shutdown(io);
     if (events) {
         /* Process outstanding events before destruction */
         io_in_consumed_signal(m, io);
@@ -291,24 +311,37 @@ static void io_destroy(h2_mplx *m, h2_io
     /* The pool is cleared/destroyed which also closes all
      * allocated file handles. Give this count back to our
      * file handle pool. */
-    m->tx_handles_reserved += io->files_handles_owned;
+    if (io->beam_in) {
+        m->tx_handles_reserved += h2_beam_get_files_beamed(io->beam_in);
+    }
+    if (io->beam_out) {
+        m->tx_handles_reserved += h2_beam_get_files_beamed(io->beam_out);
+    }
 
-    h2_io_set_remove(m->stream_ios, io);
-    h2_io_set_remove(m->ready_ios, io);
+    h2_ilist_remove(m->stream_ios, io->id);
+    h2_ilist_remove(m->ready_ios, io->id);
     if (m->redo_ios) {
-        h2_io_set_remove(m->redo_ios, io);
+        h2_ilist_remove(m->redo_ios, io->id);
     }
 
     reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc)
-                    && !io->rst_error && io->eor);
+                    && !io->rst_error);
     if (io->task) {
-        conn_rec *slave = io->task->c;
+        slave = io->task->c;
         h2_task_destroy(io->task);
         io->task = NULL;
-        
+    }
+
+    if (io->pool) {
+        if (m->spare_io_pool) {
+            apr_pool_destroy(m->spare_io_pool);
+        }
+        apr_pool_clear(io->pool);
+        m->spare_io_pool = io->pool;
+    }
+
+    if (slave) {
         if (reuse_slave && slave->keepalive == AP_CONN_KEEPALIVE) {
-            apr_bucket_delete(io->eor);
-            io->eor = NULL;
             APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave;
         }
         else {
@@ -316,18 +349,14 @@ static void io_destroy(h2_mplx *m, h2_io
             h2_slave_destroy(slave, NULL);
         }
     }
-
-    if (io->pool) {
-        apr_pool_destroy(io->pool);
-    }
-
+    
     check_tx_free(m);
 }
 
 static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error) 
 {
     /* Remove io from ready set, we will never submit it */
-    h2_io_set_remove(m->ready_ios, io);
+    h2_ilist_remove(m->ready_ios, io->id);
     if (!io->worker_started || io->worker_done) {
         /* already finished or not even started yet */
         h2_iq_remove(m->q, io->id);
@@ -336,39 +365,41 @@ static int io_stream_done(h2_mplx *m, h2
     }
     else {
         /* cleanup once task is done */
-        h2_io_make_orphaned(io, rst_error);
+        io->orphaned = 1;
+        if (rst_error) {
+            h2_io_rst(io, rst_error);
+        }
         return 1;
     }
 }
 
-static int stream_done_iter(void *ctx, h2_io *io)
+static int stream_done_iter(void *ctx, void *val)
 {
-    return io_stream_done((h2_mplx*)ctx, io, 0);
+    return io_stream_done((h2_mplx*)ctx, val, 0);
 }
 
-static int stream_print(void *ctx, h2_io *io)
+static int stream_print(void *ctx, void *val)
 {
     h2_mplx *m = ctx;
+    h2_io *io = val;
     if (io && io->request) {
         ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
                       "->03198: h2_stream(%ld-%d): %s %s %s -> %s %d"
-                      "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]", 
+                      "[orph=%d/started=%d/done=%d]", 
                       m->id, io->id, 
                       io->request->method, io->request->authority, io->request->path,
                       io->response? "http" : (io->rst_error? "reset" : "?"),
                       io->response? io->response->http_status : io->rst_error,
-                      io->orphaned, io->worker_started, io->worker_done,
-                      io->eos_in, io->eos_out);
+                      io->orphaned, io->worker_started, io->worker_done);
     }
     else if (io) {
         ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
                       "->03198: h2_stream(%ld-%d): NULL -> %s %d"
-                      "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]", 
+                      "[orph=%d/started=%d/done=%d]", 
                       m->id, io->id, 
                       io->response? "http" : (io->rst_error? "reset" : "?"),
                       io->response? io->response->http_status : io->rst_error,
-                      io->orphaned, io->worker_started, io->worker_done,
-                      io->eos_in, io->eos_out);
+                      io->orphaned, io->worker_started, io->worker_done);
     }
     else {
         ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
@@ -392,7 +423,7 @@ apr_status_t h2_mplx_release_and_join(h2
         
         h2_iq_clear(m->q);
         apr_thread_cond_broadcast(m->task_thawed);
-        while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) {
+        while (!h2_ilist_iter(m->stream_ios, stream_done_iter, m)) {
             /* iterate until all ios have been orphaned or destroyed */
         }
     
@@ -407,9 +438,13 @@ apr_status_t h2_mplx_release_and_join(h2
             m->join_wait = wait;
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                           "h2_mplx(%ld): release_join, waiting on %d worker to report back", 
-                          m->id, (int)h2_io_set_size(m->stream_ios));
+                          m->id, (int)h2_ilist_count(m->stream_ios));
                           
             status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
+            
+            while (!h2_ilist_iter(m->stream_ios, stream_done_iter, m)) {
+                /* iterate until all ios have been orphaned or destroyed */
+            }
             if (APR_STATUS_IS_TIMEUP(status)) {
                 if (i > 0) {
                     /* Oh, oh. Still we wait for assigned  workers to report that 
@@ -421,9 +456,9 @@ apr_status_t h2_mplx_release_and_join(h2
                                   "h2_mplx(%ld): release, waiting for %d seconds now for "
                                   "%d h2_workers to return, have still %d requests outstanding", 
                                   m->id, i*wait_secs, m->workers_busy,
-                                  (int)h2_io_set_size(m->stream_ios));
+                                  (int)h2_ilist_count(m->stream_ios));
                     if (i == 1) {
-                        h2_io_set_iter(m->stream_ios, stream_print, m);
+                        h2_ilist_iter(m->stream_ios, stream_print, m);
                     }
                 }
                 h2_mplx_abort(m);
@@ -431,10 +466,10 @@ apr_status_t h2_mplx_release_and_join(h2
             }
         }
         
-        if (!h2_io_set_is_empty(m->stream_ios)) {
+        if (!h2_ilist_empty(m->stream_ios)) {
             ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c, 
                           "h2_mplx(%ld): release_join, %d streams still open", 
-                          m->id, (int)h2_io_set_size(m->stream_ios));
+                          m->id, (int)h2_ilist_count(m->stream_ios));
         }
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
                       "h2_mplx(%ld): release_join -> destroy", m->id);
@@ -468,7 +503,7 @@ apr_status_t h2_mplx_stream_done(h2_mplx
      */
     AP_DEBUG_ASSERT(m);
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+        h2_io *io = h2_ilist_get(m->stream_ios, stream_id);
 
         /* there should be an h2_io, once the stream has been scheduled
          * for processing, e.g. when we received all HEADERs. But when
@@ -484,107 +519,16 @@ apr_status_t h2_mplx_stream_done(h2_mplx
     return status;
 }
 
-apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
-                             int stream_id, apr_bucket_brigade *bb, 
-                             apr_table_t *trailers,
-                             struct apr_thread_cond_t *iowait)
-{
-    apr_status_t status; 
-    int acquired;
-    
-    AP_DEBUG_ASSERT(m);
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        if (io && !io->orphaned) {
-            H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_pre");
-            
-            h2_io_signal_init(io, H2_IO_READ, m->stream_timeout, iowait);
-            status = h2_io_in_read(io, bb, -1, trailers);
-            while (APR_STATUS_IS_EAGAIN(status) 
-                   && !is_aborted(m, &status)
-                   && block == APR_BLOCK_READ) {
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
-                              "h2_mplx(%ld-%d): wait on in data (BLOCK_READ)", 
-                              m->id, stream_id);
-                status = h2_io_signal_wait(m, io);
-                if (status == APR_SUCCESS) {
-                    status = h2_io_in_read(io, bb, -1, trailers);
-                }
-            }
-            H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_post");
-            h2_io_signal_exit(io);
-        }
-        else {
-            status = APR_EOF;
-        }
-        leave_mutex(m, acquired);
-    }
-    return status;
-}
-
-apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, 
-                              const char *data, apr_size_t len, int eos)
-{
-    apr_status_t status;
-    int acquired;
-    
-    AP_DEBUG_ASSERT(m);
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        if (io && !io->orphaned) {
-            H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_pre");
-            status = h2_io_in_write(io, data, len, eos);
-            H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_post");
-            h2_io_signal(io, H2_IO_READ);
-            io_in_consumed_signal(m, io);
-        }
-        else {
-            status = APR_ECONNABORTED;
-        }
-        leave_mutex(m, acquired);
-    }
-    return status;
-}
-
-apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id)
-{
-    apr_status_t status;
-    int acquired;
-    
-    AP_DEBUG_ASSERT(m);
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        if (io && !io->orphaned) {
-            status = h2_io_in_close(io);
-            H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_close");
-            h2_io_signal(io, H2_IO_READ);
-            io_in_consumed_signal(m, io);
-        }
-        else {
-            status = APR_ECONNABORTED;
-        }
-        leave_mutex(m, acquired);
-    }
-    return status;
-}
-
 void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
 {
     m->input_consumed = cb;
     m->input_consumed_ctx = ctx;
 }
 
-typedef struct {
-    h2_mplx * m;
-    int streams_updated;
-} update_ctx;
-
-static int update_window(void *ctx, h2_io *io)
+static int update_window(void *ctx, void *val)
 {
-    update_ctx *uctx = (update_ctx*)ctx;
-    if (io_in_consumed_signal(uctx->m, io)) {
-        ++uctx->streams_updated;
-    }
+    h2_mplx *m = ctx;
+    io_in_consumed_signal(m, val);
     return 1;
 }
 
@@ -598,46 +542,11 @@ apr_status_t h2_mplx_in_update_windows(h
         return APR_ECONNABORTED;
     }
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        update_ctx ctx;
+        h2_ilist_iter(m->stream_ios, update_window, m);
         
-        ctx.m               = m;
-        ctx.streams_updated = 0;
-
-        status = APR_EAGAIN;
-        h2_io_set_iter(m->stream_ios, update_window, &ctx);
-        
-        if (ctx.streams_updated) {
-            status = APR_SUCCESS;
-        }
-        leave_mutex(m, acquired);
-    }
-    return status;
-}
-
-apr_status_t h2_mplx_out_get_brigade(h2_mplx *m, int stream_id, 
-                                     apr_bucket_brigade *bb, 
-                                     apr_off_t len, apr_table_t **ptrailers)
-{
-    apr_status_t status;
-    int acquired;
-
-    AP_DEBUG_ASSERT(m);
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        if (io && !io->orphaned) {
-            H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_get_brigade_pre");
-            
-            status = h2_io_out_get_brigade(io, bb, len);
-            
-            H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_get_brigade_post");
-            if (status == APR_SUCCESS) {
-                h2_io_signal(io, H2_IO_WRITE);
-            }
-        }
-        else {
-            status = APR_ECONNABORTED;
-        }
-        *ptrailers = io->response? io->response->trailers : NULL;
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
+                      "h2_session(%ld): windows updated", m->id);
+        status = APR_SUCCESS;
         leave_mutex(m, acquired);
     }
     return status;
@@ -651,7 +560,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *
 
     AP_DEBUG_ASSERT(m);
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_shift(m->ready_ios);
+        h2_io *io = h2_ilist_shift(m->ready_ios);
         if (io && !m->aborted) {
             stream = h2_ihash_get(streams, io->id);
             if (stream) {
@@ -661,9 +570,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *
                 }
                 else {
                     AP_DEBUG_ASSERT(io->response);
-                    H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_next_submit_pre");
-                    h2_stream_set_response(stream, io->response, io->bbout);
-                    H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_next_submit_post");
+                    h2_stream_set_response(stream, io->response, io->beam_out);
                 }
             }
             else {
@@ -675,114 +582,62 @@ h2_stream *h2_mplx_next_submit(h2_mplx *
                               "h2_mplx(%ld): stream for response %d closed, "
                               "resetting io to close request processing",
                               m->id, io->id);
-                h2_io_make_orphaned(io, H2_ERR_STREAM_CLOSED);
+                io->orphaned = 1;
+                h2_io_rst(io, H2_ERR_STREAM_CLOSED);
                 if (!io->worker_started || io->worker_done) {
                     io_destroy(m, io, 1);
                 }
                 else {
                     /* hang around until the h2_task is done, but
-                     * shutdown input and send out any events (e.g. window
-                     * updates) asap. */
-                    h2_io_in_shutdown(io);
+                     * shutdown input/output and send out any events asap. */
+                    h2_io_shutdown(io);
                     io_in_consumed_signal(m, io);
                 }
             }
-            
-            h2_io_signal(io, H2_IO_WRITE);
         }
         leave_mutex(m, acquired);
     }
     return stream;
 }
 
-static apr_status_t out_write(h2_mplx *m, h2_io *io, 
-                              ap_filter_t* f, int blocking,
-                              apr_bucket_brigade *bb,
-                              struct apr_thread_cond_t *iowait)
-{
-    apr_status_t status = APR_SUCCESS;
-    /* We check the memory footprint queued for this stream_id
-     * and block if it exceeds our configured limit.
-     * We will not split buckets to enforce the limit to the last
-     * byte. After all, the bucket is already in memory.
-     */
-    while (status == APR_SUCCESS 
-           && !APR_BRIGADE_EMPTY(bb) 
-           && !is_aborted(m, &status)) {
-        
-        status = h2_io_out_write(io, bb, blocking? m->stream_max_mem : INT_MAX, 
-                                 &m->tx_handles_reserved);
-        io_out_consumed_signal(m, io);
-        
-        /* Wait for data to drain until there is room again or
-         * stream timeout expires */
-        h2_io_signal_init(io, H2_IO_WRITE, m->stream_timeout, iowait);
-        while (status == APR_SUCCESS
-               && !APR_BRIGADE_EMPTY(bb) 
-               && iowait
-               && (m->stream_max_mem <= h2_io_out_length(io))
-               && !is_aborted(m, &status)) {
-            if (!blocking) {
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
-                              "h2_mplx(%ld-%d): incomplete write", 
-                              m->id, io->id);
-                return APR_INCOMPLETE;
-            }
-            if (f) {
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
-                              "h2_mplx(%ld-%d): waiting for out drain", 
-                              m->id, io->id);
-            }
-            status = h2_io_signal_wait(m, io);
-        }
-        h2_io_signal_exit(io);
-    }
-    apr_brigade_cleanup(bb);
-    
-    return status;
-}
-
 static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response,
-                             ap_filter_t* f, apr_bucket_brigade *bb,
-                             struct apr_thread_cond_t *iowait)
+                             h2_bucket_beam *output)
 {
     apr_status_t status = APR_SUCCESS;
     
-    h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-    if (io && !io->orphaned) {
-        if (f) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
-                          "h2_mplx(%ld-%d): open response: %d, rst=%d",
-                          m->id, stream_id, response->http_status, 
-                          response->rst_error);
-        }
-        
-        h2_io_set_response(io, response);
-        h2_io_set_add(m->ready_ios, io);
-        if (response && response->http_status < 300) {
-            /* we might see some file buckets in the output, see
-             * if we have enough handles reserved. */
-            check_tx_reservation(m);
-        }
-        if (bb) {
-            status = out_write(m, io, f, 0, bb, iowait);
-            if (status == APR_INCOMPLETE) {
-                /* write will have transferred as much data as possible.
-                   caller has to deal with non-empty brigade */
-                status = APR_SUCCESS;
-            }
-        }
-        have_out_data_for(m, stream_id);
+    h2_io *io = h2_ilist_get(m->stream_ios, stream_id);
+    if (!io || io->orphaned) {
+        return APR_ECONNABORTED;
     }
-    else {
-        status = APR_ECONNABORTED;
+    
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                  "h2_mplx(%ld-%d): open response: %d, rst=%d",
+                  m->id, stream_id, response->http_status, 
+                  response->rst_error);
+    
+    if (output) {
+        h2_beam_buffer_size_set(output, m->stream_max_mem);
+        h2_beam_timeout_set(output, m->stream_timeout);
+        h2_beam_on_consumed(output, stream_output_consumed, io);
+        m->tx_handles_reserved -= h2_beam_get_files_beamed(output);
+        h2_beam_on_file_beam(output, can_beam_file, m);
+        h2_beam_mutex_set(output, io_mutex_enter, io_mutex_leave, 
+                          io->task->cond, m);
+    }
+    h2_io_set_response(io, response, output);
+    
+    h2_ilist_add(m->ready_ios, io);
+    if (response && response->http_status < 300) {
+        /* we might see some file buckets in the output, see
+         * if we have enough handles reserved. */
+        check_tx_reservation(m);
     }
+    have_out_data_for(m, stream_id);
     return status;
 }
 
 apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response,
-                              ap_filter_t* f, apr_bucket_brigade *bb,
-                              struct apr_thread_cond_t *iowait)
+                              h2_bucket_beam *output)
 {
     apr_status_t status;
     int acquired;
@@ -793,37 +648,7 @@ apr_status_t h2_mplx_out_open(h2_mplx *m
             status = APR_ECONNABORTED;
         }
         else {
-            status = out_open(m, stream_id, response, f, bb, iowait);
-            if (APLOGctrace1(m->c)) {
-                h2_util_bb_log(m->c, stream_id, APLOG_TRACE1, "h2_mplx_out_open", bb);
-            }
-        }
-        leave_mutex(m, acquired);
-    }
-    return status;
-}
-
-apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id, 
-                               ap_filter_t* f, int blocking,
-                               apr_bucket_brigade *bb,
-                               struct apr_thread_cond_t *iowait)
-{
-    apr_status_t status;
-    int acquired;
-    
-    AP_DEBUG_ASSERT(m);
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        if (io && !io->orphaned) {
-            status = out_write(m, io, f, blocking, bb, iowait);
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
-                          "h2_mplx(%ld-%d): write", m->id, io->id);
-            H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_write");
-            
-            have_out_data_for(m, stream_id);
-        }
-        else {
-            status = APR_ECONNABORTED;
+            status = out_open(m, stream_id, response, output);
         }
         leave_mutex(m, acquired);
     }
@@ -837,7 +662,7 @@ apr_status_t h2_mplx_out_close(h2_mplx *
     
     AP_DEBUG_ASSERT(m);
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+        h2_io *io = h2_ilist_get(m->stream_ios, stream_id);
         if (io && !io->orphaned) {
             if (!io->response && !io->rst_error) {
                 /* In case a close comes before a response was created,
@@ -846,45 +671,20 @@ apr_status_t h2_mplx_out_close(h2_mplx *
                  */
                 h2_response *r = h2_response_die(stream_id, APR_EGENERAL, 
                                                  io->request, m->pool);
-                status = out_open(m, stream_id, r, NULL, NULL, NULL);
+                status = out_open(m, stream_id, r, NULL);
                 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
                               "h2_mplx(%ld-%d): close, no response, no rst", 
                               m->id, io->id);
             }
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
-                          "h2_mplx(%ld-%d): close with eor=%s", 
-                          m->id, io->id, io->eor? "yes" : "no");
-            status = h2_io_out_close(io);
-            H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close");
-            io_out_consumed_signal(m, io);
-            
-            have_out_data_for(m, stream_id);
-        }
-        else {
-            status = APR_ECONNABORTED;
-        }
-        leave_mutex(m, acquired);
-    }
-    return status;
-}
-
-apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error)
-{
-    apr_status_t status;
-    int acquired;
-    
-    AP_DEBUG_ASSERT(m);
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        if (io && !io->rst_error && !io->orphaned) {
-            h2_io_rst(io, error);
-            if (!io->response) {
-                h2_io_set_add(m->ready_ios, io);
+                          "h2_mplx(%ld-%d): close", m->id, io->id);
+            if (io->beam_out) {
+                status = h2_beam_close(io->beam_out);
+                h2_beam_log(io->beam_out, stream_id, "out_close", m->c, 
+                            APLOG_TRACE2);
             }
-            H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_rst");
-            
+            io_out_consumed_signal(m, io);
             have_out_data_for(m, stream_id);
-            h2_io_signal(io, H2_IO_WRITE);
         }
         else {
             status = APR_ECONNABORTED;
@@ -894,26 +694,6 @@ apr_status_t h2_mplx_out_rst(h2_mplx *m,
     return status;
 }
 
-int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id)
-{
-    apr_status_t status;
-    int has_data = 0;
-    int acquired;
-    
-    AP_DEBUG_ASSERT(m);
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        if (io && !io->orphaned) {
-            has_data = h2_io_out_has_data(io);
-        }
-        else {
-            has_data = 0;
-        }
-        leave_mutex(m, acquired);
-    }
-    return has_data;
-}
-
 apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
                                  apr_thread_cond_t *iowait)
 {
@@ -969,22 +749,7 @@ apr_status_t h2_mplx_reprioritize(h2_mpl
     return status;
 }
 
-static h2_io *open_io(h2_mplx *m, int stream_id, const h2_request *request)
-{
-    apr_pool_t *io_pool;
-    h2_io *io;
-    
-    apr_pool_create(&io_pool, m->pool);
-    apr_pool_tag(io_pool, "h2_io");
-    io = h2_io_create(stream_id, io_pool, m->bucket_alloc, request);
-    h2_io_set_add(m->stream_ios, io);
-    
-    return io;
-}
-
-
-apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, 
-                             const h2_request *req, 
+apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, 
                              h2_stream_pri_cmp *cmp, void *ctx)
 {
     apr_status_t status;
@@ -997,24 +762,38 @@ apr_status_t h2_mplx_process(h2_mplx *m,
             status = APR_ECONNABORTED;
         }
         else {
-            h2_io *io = open_io(m, stream_id, req);
+            apr_pool_t *io_pool;
+            h2_io *io;
             
-            if (!io->request->body) {
-                status = h2_io_in_close(io);
+            if (!m->need_registration) {
+                m->need_registration = h2_iq_empty(m->q);
             }
-            
-            m->need_registration = m->need_registration || h2_iq_empty(m->q);
-            do_registration = (m->need_registration && m->workers_busy < m->workers_max);
+            if (m->workers_busy < m->workers_max) {
+                do_registration = m->need_registration;
+            }
+
+            io_pool = m->spare_io_pool;
+            if (io_pool) {
+                m->spare_io_pool = NULL;
+            }
+            else {
+                apr_pool_create(&io_pool, m->pool);
+                apr_pool_tag(io_pool, "h2_io");
+            }
+            io = h2_io_create(stream->id, io_pool, stream->request);
+            h2_ilist_add(m->stream_ios, io);            
             h2_iq_add(m->q, io->id, cmp, ctx);
             
+            stream->input = io->beam_in;
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
-                          "h2_mplx(%ld-%d): process", m->c->id, stream_id);
-            H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_process");
+                          "h2_mplx(%ld-%d): process, body=%d", 
+                          m->c->id, stream->id, io->request->body);
         }
         leave_mutex(m, acquired);
     }
-    if (status == APR_SUCCESS && do_registration) {
-        workers_register(m);
+    if (do_registration) {
+        m->need_registration = 0;
+        h2_workers_register(m->workers, m);
     }
     return status;
 }
@@ -1022,20 +801,24 @@ apr_status_t h2_mplx_process(h2_mplx *m,
 static h2_task *pop_task(h2_mplx *m)
 {
     h2_task *task = NULL;
+    h2_io *io;
     int sid;
-    while (!m->aborted && !task 
-        && (m->workers_busy < m->workers_limit)
-        && (sid = h2_iq_shift(m->q)) > 0) {
-        h2_io *io = h2_io_set_get(m->stream_ios, sid);
-        if (io && io->orphaned) {
-            io_destroy(m, io, 0);
-            if (m->join_wait) {
-                apr_thread_cond_signal(m->join_wait);
-            }
-        }
-        else if (io) {
+    while (!m->aborted && !task  && (m->workers_busy < m->workers_limit)
+           && (sid = h2_iq_shift(m->q)) > 0) {
+        
+        io = h2_ilist_get(m->stream_ios, sid);
+        if (io) {
             conn_rec *slave, **pslave;
             
+            if (io->orphaned) {
+                /* TODO: add to purge list */
+                io_destroy(m, io, 0);
+                if (m->join_wait) {
+                    apr_thread_cond_signal(m->join_wait);
+                }
+                continue;
+            }
+            
             pslave = (conn_rec **)apr_array_pop(m->spare_slaves);
             if (pslave) {
                 slave = *pslave;
@@ -1046,12 +829,21 @@ static h2_task *pop_task(h2_mplx *m)
             }
             
             slave->sbh = m->c->sbh;
-            io->task = task = h2_task_create(m->id, io->request, slave, m);
+            io->task = task = h2_task_create(slave, io->request, 
+                                             io->beam_in, m);
             m->c->keepalives++;
             apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id);
             
             io->worker_started = 1;
             io->started_at = apr_time_now();
+            
+            if (io->beam_in) {
+                h2_beam_timeout_set(io->beam_in, m->stream_timeout);
+                h2_beam_on_consumed(io->beam_in, stream_input_consumed, m);
+                h2_beam_on_file_beam(io->beam_in, can_beam_file, m);
+                h2_beam_mutex_set(io->beam_in, io_mutex_enter, 
+                                  io_mutex_leave, task->cond, m);
+            }
             if (sid > m->max_stream_started) {
                 m->max_stream_started = sid;
             }
@@ -1088,7 +880,7 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, in
 static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
 {
     if (task) {
-        h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
+        h2_io *io = h2_ilist_get(m->stream_ios, task->stream_id);
         
         if (task->frozen) {
             /* this task was handed over to an engine for processing 
@@ -1112,14 +904,17 @@ static void task_done(h2_mplx *m, h2_tas
             h2_mplx_out_close(m, task->stream_id);
             
             if (ngn && io) {
-                apr_off_t bytes = io->output_consumed + h2_io_out_length(io);
+                apr_off_t bytes = 0;
+                if (io->beam_out) {
+                    h2_beam_send(io->beam_out, NULL, APR_NONBLOCK_READ);
+                    bytes += h2_beam_get_buffered(io->beam_out);
+                }
                 if (bytes > 0) {
                     /* we need to report consumed and current buffered output
                      * to the engine. The request will be streamed out or cancelled,
                      * no more data is coming from it and the engine should update
                      * its calculations before we destroy this information. */
                     h2_req_engine_out_consumed(ngn, task->c, bytes);
-                    io->output_consumed = 0;
                 }
             }
             
@@ -1136,10 +931,10 @@ static void task_done(h2_mplx *m, h2_tas
             if (io) {
                 apr_time_t now = apr_time_now();
                 if (!io->orphaned && m->redo_ios
-                    && h2_io_set_get(m->redo_ios, io->id)) {
+                    && h2_ilist_get(m->redo_ios, io->id)) {
                     /* reset and schedule again */
                     h2_io_redo(io);
-                    h2_io_set_remove(m->redo_ios, io);
+                    h2_ilist_remove(m->redo_ios, io->id);
                     h2_iq_add(m->q, io->id, NULL, NULL);
                 }
                 else {
@@ -1168,6 +963,7 @@ static void task_done(h2_mplx *m, h2_tas
                 }
                 
                 if (io->orphaned) {
+                    /* TODO: add to purge list */
                     io_destroy(m, io, 0);
                     if (m->join_wait) {
                         apr_thread_cond_signal(m->join_wait);
@@ -1211,12 +1007,12 @@ typedef struct {
     apr_time_t now;
 } io_iter_ctx;
 
-static int latest_repeatable_busy_unsubmitted_iter(void *data, h2_io *io)
+static int latest_repeatable_busy_unsubmitted_iter(void *data, void *val)
 {
     io_iter_ctx *ctx = data;
+    h2_io *io = val;
     if (io->worker_started && !io->worker_done
-        && h2_io_is_repeatable(io)
-        && !h2_io_set_get(ctx->m->redo_ios, io->id)) {
+        && h2_io_can_redo(io) && !h2_ilist_get(ctx->m->redo_ios, io->id)) {
         /* this io occupies a worker, the response has not been submitted yet,
          * not been cancelled and it is a repeatable request
          * -> it can be re-scheduled later */
@@ -1233,13 +1029,14 @@ static h2_io *get_latest_repeatable_busy
     io_iter_ctx ctx;
     ctx.m = m;
     ctx.io = NULL;
-    h2_io_set_iter(m->stream_ios, latest_repeatable_busy_unsubmitted_iter, &ctx);
+    h2_ilist_iter(m->stream_ios, latest_repeatable_busy_unsubmitted_iter, &ctx);
     return ctx.io;
 }
 
-static int timed_out_busy_iter(void *data, h2_io *io)
+static int timed_out_busy_iter(void *data, void *val)
 {
     io_iter_ctx *ctx = data;
+    h2_io *io = val;
     if (io->worker_started && !io->worker_done
         && (ctx->now - io->started_at) > ctx->m->stream_timeout) {
         /* timed out stream occupying a worker, found */
@@ -1254,7 +1051,7 @@ static h2_io *get_timed_out_busy_stream(
     ctx.m = m;
     ctx.io = NULL;
     ctx.now = apr_time_now();
-    h2_io_set_iter(m->stream_ios, timed_out_busy_iter, &ctx);
+    h2_ilist_iter(m->stream_ios, timed_out_busy_iter, &ctx);
     return ctx.io;
 }
 
@@ -1264,19 +1061,19 @@ static apr_status_t unschedule_slow_ios(
     int n;
     
     if (!m->redo_ios) {
-        m->redo_ios = h2_io_set_create(m->pool);
+        m->redo_ios = h2_ilist_create(m->pool);
     }
     /* Try to get rid of streams that occupy workers. Look for safe requests
      * that are repeatable. If none found, fail the connection.
      */
-    n = (m->workers_busy - m->workers_limit - h2_io_set_size(m->redo_ios));
+    n = (m->workers_busy - m->workers_limit - h2_ilist_count(m->redo_ios));
     while (n > 0 && (io = get_latest_repeatable_busy_unsubmitted_io(m))) {
-        h2_io_set_add(m->redo_ios, io);
+        h2_ilist_add(m->redo_ios, io);
         h2_io_rst(io, H2_ERR_CANCEL);
         --n;
     }
     
-    if ((m->workers_busy - h2_io_set_size(m->redo_ios)) > m->workers_limit) {
+    if ((m->workers_busy - h2_ilist_count(m->redo_ios)) > m->workers_limit) {
         io = get_timed_out_busy_stream(m);
         if (io) {
             /* Too many busy workers, unable to cancel enough streams
@@ -1295,7 +1092,7 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
     int acquired;
     
     if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-        apr_size_t scount = h2_io_set_size(m->stream_ios);
+        apr_size_t scount = h2_ilist_count(m->stream_ios);
         if (scount > 0 && m->workers_busy) {
             /* If we have streams in connection state 'IDLE', meaning
              * all streams are ready to sent data out, but lack
@@ -1350,9 +1147,10 @@ typedef struct {
     int streams_updated;
 } ngn_update_ctx;
 
-static int ngn_update_window(void *ctx, h2_io *io)
+static int ngn_update_window(void *ctx, void *val)
 {
     ngn_update_ctx *uctx = ctx;
+    h2_io *io = val;
     if (io && io->task && io->task->assigned == uctx->ngn
         && io_out_consumed_signal(uctx->m, io)) {
         ++uctx->streams_updated;
@@ -1367,7 +1165,7 @@ static apr_status_t ngn_out_update_windo
     ctx.m = m;
     ctx.ngn = ngn;
     ctx.streams_updated = 0;
-    h2_io_set_iter(m->stream_ios, ngn_update_window, &ctx);
+    h2_ilist_iter(m->stream_ios, ngn_update_window, &ctx);
     
     return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN;
 }
@@ -1389,7 +1187,7 @@ apr_status_t h2_mplx_req_engine_push(con
     task->r = r;
     
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
+        h2_io *io = h2_ilist_get(m->stream_ios, task->stream_id);
         if (!io || io->orphaned) {
             status = APR_ECONNABORTED;
         }

Modified: httpd/httpd/trunk/modules/http2/h2_mplx.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.h?rev=1739303&r1=1739302&r2=1739303&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.h Fri Apr 15 13:50:46 2016
@@ -37,16 +37,17 @@
 struct apr_pool_t;
 struct apr_thread_mutex_t;
 struct apr_thread_cond_t;
+struct h2_bucket_beam;
 struct h2_config;
 struct h2_ihash_t;
+struct h2_ilist_t;
 struct h2_response;
 struct h2_task;
 struct h2_stream;
 struct h2_request;
-struct h2_io_set;
 struct apr_thread_cond_t;
 struct h2_workers;
-struct h2_int_queue;
+struct h2_iqueue;
 struct h2_ngn_shed;
 struct h2_req_engine;
 
@@ -72,10 +73,10 @@ struct h2_mplx {
     unsigned int aborted : 1;
     unsigned int need_registration : 1;
 
-    struct h2_int_queue *q;
-    struct h2_io_set *stream_ios;
-    struct h2_io_set *ready_ios;
-    struct h2_io_set *redo_ios;
+    struct h2_iqueue *q;
+    struct h2_ilist_t *stream_ios;
+    struct h2_ilist_t *ready_ios;
+    struct h2_ilist_t *redo_ios;
     
     apr_uint32_t max_streams;        /* max # of concurrent streams */
     apr_uint32_t max_stream_started; /* highest stream id that started processing */
@@ -96,10 +97,11 @@ struct h2_mplx {
     apr_size_t stream_max_mem;
     apr_interval_time_t stream_timeout;
     
+    apr_pool_t *spare_io_pool;
     apr_array_header_t *spare_slaves; /* spare slave connections */
     
     struct h2_workers *workers;
-    apr_size_t tx_handles_reserved;
+    int tx_handles_reserved;
     apr_size_t tx_chunk_size;
     
     h2_mplx_consumed_cb *input_consumed;
@@ -166,10 +168,6 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m
  */
 apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error);
 
-/* Return != 0 iff the multiplexer has output data for the given stream. 
- */
-int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id);
-
 /**
  * Waits on output data from any stream in this session to become available. 
  * Returns APR_TIMEUP if no data arrived in the given time.
@@ -190,8 +188,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx
  * @param cmp the stream priority compare function
  * @param ctx context data for the compare function
  */
-apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, 
-                             const struct h2_request *r, 
+apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, 
                              h2_stream_pri_cmp *cmp, void *ctx);
 
 /**
@@ -219,37 +216,11 @@ void h2_mplx_set_consumed_cb(h2_mplx *m,
  ******************************************************************************/
 
 /**
- * Reads a buckets for the given stream_id. Will return ARP_EAGAIN when
- * called with APR_NONBLOCK_READ and no data present. Will return APR_EOF
- * when the end of the stream input has been reached.
- * The condition passed in will be used for blocking/signalling and will
- * be protected by the mplx's own mutex.
- */
-apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
-                             int stream_id, apr_bucket_brigade *bb,
-                             apr_table_t *trailers, 
-                             struct apr_thread_cond_t *iowait);
-
-/**
- * Appends data to the input of the given stream. Storage of input data is
- * not subject to flow control.
- */
-apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, 
-                              const char *data, apr_size_t len, int eos);
-
-/**
- * Closes the input for the given stream_id.
- */
-apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id);
-
-/**
  * Invoke the consumed callback for all streams that had bytes read since the 
  * last call to this function. If no stream had input data consumed, the 
  * callback is not invoked.
  * The consumed callback may also be invoked at other times whenever
  * the need arises.
- * Returns APR_SUCCESS when an update happened, APR_EAGAIN if no update
- * happened.
  */
 apr_status_t h2_mplx_in_update_windows(h2_mplx *m);
 
@@ -267,44 +238,17 @@ struct h2_stream *h2_mplx_next_submit(h2
                                       struct h2_ihash_t *streams);
 
 /**
- * Reads output data into the given brigade. Will never block, but
- * return APR_EAGAIN until data arrives or the stream is closed.
- */
-apr_status_t h2_mplx_out_get_brigade(h2_mplx *mplx, int stream_id, 
-                                     apr_bucket_brigade *bb, 
-                                     apr_off_t len, apr_table_t **ptrailers);
-
-/**
  * Opens the output for the given stream with the specified response.
  */
 apr_status_t h2_mplx_out_open(h2_mplx *mplx, int stream_id,
                               struct h2_response *response,
-                              ap_filter_t* filter, apr_bucket_brigade *bb,
-                              struct apr_thread_cond_t *iowait);
-
-/**
- * Append the brigade to the stream output. Might block if amount
- * of bytes buffered reaches configured max.
- * @param stream_id the stream identifier
- * @param filter the apache filter context of the data
- * @param blocking == 0 iff call should return with APR_INCOMPLETE if
- *                 the full brigade cannot be written at once
- * @param bb the bucket brigade to append
- * @param iowait a conditional used for block/signalling in h2_mplx
- */
-apr_status_t h2_mplx_out_write(h2_mplx *mplx, int stream_id, 
-                               ap_filter_t* filter, 
-                               int blocking,
-                               apr_bucket_brigade *bb,
-                               struct apr_thread_cond_t *iowait);
+                              struct h2_bucket_beam *output);
 
 /**
  * Closes the output for stream stream_id. 
  */
 apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id);
 
-apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error);
-
 /*******************************************************************************
  * h2_mplx list Manipulation.
  ******************************************************************************/

Modified: httpd/httpd/trunk/modules/http2/h2_ngn_shed.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_ngn_shed.c?rev=1739303&r1=1739302&r2=1739303&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_ngn_shed.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_ngn_shed.c Fri Apr 15 13:50:46 2016
@@ -33,12 +33,10 @@
 #include "h2_conn.h"
 #include "h2_ctx.h"
 #include "h2_h2.h"
-#include "h2_int_queue.h"
 #include "h2_mplx.h"
 #include "h2_response.h"
 #include "h2_request.h"
 #include "h2_task.h"
-#include "h2_task_output.h"
 #include "h2_util.h"
 #include "h2_ngn_shed.h"
 

Modified: httpd/httpd/trunk/modules/http2/h2_proxy_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_proxy_session.c?rev=1739303&r1=1739302&r2=1739303&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_proxy_session.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_proxy_session.c Fri Apr 15 13:50:46 2016
@@ -23,7 +23,6 @@
 
 #include "mod_http2.h"
 #include "h2.h"
-#include "h2_int_queue.h"
 #include "h2_request.h"
 #include "h2_util.h"
 #include "h2_proxy_session.h"

Modified: httpd/httpd/trunk/modules/http2/h2_proxy_session.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_proxy_session.h?rev=1739303&r1=1739302&r2=1739303&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_proxy_session.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_proxy_session.h Fri Apr 15 13:50:46 2016
@@ -20,7 +20,7 @@
 
 #include <nghttp2/nghttp2.h>
 
-struct h2_int_queue;
+struct h2_iqueue;
 struct h2_ihash_t;
 
 typedef enum {
@@ -74,7 +74,7 @@ struct h2_proxy_session {
     apr_interval_time_t wait_timeout;
 
     struct h2_ihash_t *streams;
-    struct h2_int_queue *suspended;
+    struct h2_iqueue *suspended;
     apr_size_t remote_max_concurrent;
     int last_stream_id;     /* last stream id processed by backend, or 0 */
     

Modified: httpd/httpd/trunk/modules/http2/h2_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_session.c?rev=1739303&r1=1739302&r2=1739303&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_session.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_session.c Fri Apr 15 13:50:46 2016
@@ -112,7 +112,7 @@ static void cleanup_streams(h2_session *
     while (1) {
         h2_ihash_iter(session->streams, find_cleanup_stream, &ctx);
         if (ctx.candidate) {
-            h2_session_stream_destroy(session, ctx.candidate);
+            h2_session_stream_done(session, ctx.candidate);
             ctx.candidate = NULL;
         }
         else {
@@ -1277,8 +1277,9 @@ static apr_status_t submit_response(h2_s
         const h2_priority *prio;
         
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073)
-                      "h2_stream(%ld-%d): submit response %d",
-                      session->id, stream->id, response->http_status);
+                      "h2_stream(%ld-%d): submit response %d, REMOTE_WINDOW_SIZE=%u",
+                      session->id, stream->id, response->http_status,
+                      (unsigned int)nghttp2_session_get_stream_remote_window_size(session->ngh2, stream->id));
         
         if (response->content_length != 0) {
             memset(&provider, 0, sizeof(provider));
@@ -1504,21 +1505,24 @@ apr_status_t h2_session_set_prio(h2_sess
     return status;
 }
 
-apr_status_t h2_session_stream_destroy(h2_session *session, h2_stream *stream)
+apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
 {
     apr_pool_t *pool = h2_stream_detach_pool(stream);
-
+    int stream_id = stream->id;
+    int rst_error = stream->rst_error;
+    
     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
                   "h2_stream(%ld-%d): cleanup by EOS bucket destroy", 
-                  session->id, stream->id);
+                  session->id, stream_id);
+    if (session->streams) {
+        h2_ihash_remove(session->streams, stream_id);
+    }
+    
+    h2_stream_cleanup(stream);
     /* this may be called while the session has already freed
      * some internal structures or even when the mplx is locked. */
     if (session->mplx) {
-        h2_mplx_stream_done(session->mplx, stream->id, stream->rst_error);
-    }
-    
-    if (session->streams) {
-        h2_ihash_remove(session->streams, stream->id);
+        h2_mplx_stream_done(session->mplx, stream_id, rst_error);
     }
     h2_stream_destroy(stream);
     
@@ -1529,6 +1533,7 @@ apr_status_t h2_session_stream_destroy(h
         }
         session->spare = pool;
     }
+
     return APR_SUCCESS;
 }
 
@@ -2217,9 +2222,10 @@ apr_status_t h2_session_process(h2_sessi
                     }
                     /* send out window updates for our inputs */
                     status = h2_mplx_in_update_windows(session->mplx);
-                    if (status != APR_SUCCESS && status != APR_EAGAIN) {
+                    if (status != APR_SUCCESS) {
                         dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 
-                                         H2_ERR_INTERNAL_ERROR, "window update error");
+                                       H2_ERR_INTERNAL_ERROR, 
+                                       "window update error");
                         break;
                     }
                 }

Modified: httpd/httpd/trunk/modules/http2/h2_session.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_session.h?rev=1739303&r1=1739302&r2=1739303&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_session.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_session.h Fri Apr 15 13:50:46 2016
@@ -218,8 +218,8 @@ int h2_session_push_enabled(h2_session *
  * @param session the session to which the stream belongs
  * @param stream the stream to destroy
  */
-apr_status_t h2_session_stream_destroy(h2_session *session, 
-                                       struct h2_stream *stream);
+apr_status_t h2_session_stream_done(h2_session *session, 
+                                    struct h2_stream *stream);
 
 /**
  * Submit a push promise on the stream and schedule the new steam for

Modified: httpd/httpd/trunk/modules/http2/h2_stream.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_stream.c?rev=1739303&r1=1739302&r2=1739303&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_stream.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_stream.c Fri Apr 15 13:50:46 2016
@@ -24,6 +24,7 @@
 #include <nghttp2/nghttp2.h>
 
 #include "h2_private.h"
+#include "h2_bucket_beam.h"
 #include "h2_conn.h"
 #include "h2_config.h"
 #include "h2_h2.h"
@@ -36,7 +37,6 @@
 #include "h2_stream.h"
 #include "h2_task.h"
 #include "h2_ctx.h"
-#include "h2_task_input.h"
 #include "h2_task.h"
 #include "h2_util.h"
 
@@ -52,6 +52,13 @@ static int state_transition[][7] = {
 /*CL*/{  1, 1, 0, 0, 1, 1, 1 },
 };
 
+#define H2_STREAM_OUT_LOG(lvl,s,msg) \
+    do { \
+        if (APLOG_C_IS_LEVEL((s)->session->c,lvl)) \
+        h2_util_bb_log((s)->session->c,(s)->session->id,lvl,msg,(s)->buffer); \
+    } while(0)
+    
+
 static int set_state(h2_stream *stream, h2_stream_state_t state)
 {
     int allowed = state_transition[state][stream->state];
@@ -135,8 +142,6 @@ static int output_open(h2_stream *stream
     }
 }
 
-static h2_sos *h2_sos_mplx_create(h2_stream *stream, h2_response *response);
-
 h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session,
                           int initiated_on, const h2_request *creq)
 {
@@ -166,18 +171,26 @@ h2_stream *h2_stream_open(int id, apr_po
     return stream;
 }
 
-apr_status_t h2_stream_destroy(h2_stream *stream)
+void h2_stream_cleanup(h2_stream *stream)
+{
+    AP_DEBUG_ASSERT(stream);
+    if (stream->buffer) {
+        apr_brigade_cleanup(stream->buffer);
+    }
+}
+
+void h2_stream_destroy(h2_stream *stream)
 {
     AP_DEBUG_ASSERT(stream);
+    h2_stream_cleanup(stream);
     if (stream->pool) {
         apr_pool_destroy(stream->pool);
     }
-    return APR_SUCCESS;
 }
 
-void h2_stream_cleanup(h2_stream *stream)
+void h2_stream_eos_destroy(h2_stream *stream)
 {
-    h2_session_stream_destroy(stream->session, stream);
+    h2_session_stream_done(stream->session, stream);
     /* stream is gone */
 }
 
@@ -200,33 +213,7 @@ void h2_stream_rst(h2_stream *stream, in
 
 struct h2_response *h2_stream_get_response(h2_stream *stream)
 {
-    return stream->sos? stream->sos->response : NULL;
-}
-
-apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response,
-                                    apr_bucket_brigade *bb)
-{
-    apr_status_t status = APR_SUCCESS;
-    h2_sos *sos;
-    
-    if (!output_open(stream)) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
-                      "h2_stream(%ld-%d): output closed", 
-                      stream->session->id, stream->id);
-        return APR_ECONNRESET;
-    }
-    
-    sos = h2_sos_mplx_create(stream, response);
-    if (sos->response->sos_filter) {
-        sos = h2_filter_sos_create(sos->response->sos_filter, sos); 
-    }
-    stream->sos = sos;
-    
-    status = stream->sos->buffer(stream->sos, bb);
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
-                  "h2_stream(%ld-%d): set_response(%d)", 
-                  stream->session->id, stream->id, stream->sos->response->http_status);
-    return status;
+    return stream->response;
 }
 
 apr_status_t h2_stream_set_request(h2_stream *stream, request_rec *r)
@@ -286,15 +273,11 @@ apr_status_t h2_stream_schedule(h2_strea
     status = h2_request_end_headers(stream->request, stream->pool, 
                                     eos, push_enabled);
     if (status == APR_SUCCESS) {
-        if (!eos) {
-            stream->request->body = 1;
-        }
-        stream->input_remaining = stream->request->content_length;
-        
-        status = h2_mplx_process(stream->session->mplx, stream->id, 
-                                 stream->request, cmp, ctx);
+        stream->request->body = !eos;
         stream->scheduled = 1;
+        stream->input_remaining = stream->request->content_length;
         
+        status = h2_mplx_process(stream->session->mplx, stream, cmp, ctx);
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
                       "h2_stream(%ld-%d): scheduled %s %s://%s%s",
                       stream->session->id, stream->id,
@@ -331,8 +314,8 @@ apr_status_t h2_stream_close_input(h2_st
         return APR_ECONNRESET;
     }
     
-    if (close_input(stream)) {
-        status = h2_mplx_in_close(stream->session->mplx, stream->id);
+    if (close_input(stream) && stream->input) {
+        status = h2_beam_close(stream->input);
     }
     return status;
 }
@@ -340,25 +323,29 @@ apr_status_t h2_stream_close_input(h2_st
 apr_status_t h2_stream_write_data(h2_stream *stream,
                                   const char *data, size_t len, int eos)
 {
+    conn_rec *c = stream->session->c;
     apr_status_t status = APR_SUCCESS;
     
     AP_DEBUG_ASSERT(stream);
+    if (!stream->input) {
+        return APR_EOF;
+    }
     if (input_closed(stream) || !stream->request->eoh) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
                       "h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d", 
                       stream->session->id, stream->id, input_closed(stream),
                       stream->request->eoh);
         return APR_EINVAL;
     }
 
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
                   "h2_stream(%ld-%d): add %ld input bytes", 
                   stream->session->id, stream->id, (long)len);
 
     if (!stream->request->chunked) {
         stream->input_remaining -= len;
         if (stream->input_remaining < 0) {
-            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c,
+            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c,
                           APLOGNO(02961) 
                           "h2_stream(%ld-%d): got %ld more content bytes than announced "
                           "in content-length header: %ld", 
@@ -370,10 +357,18 @@ apr_status_t h2_stream_write_data(h2_str
         }
     }
     
-    status = h2_mplx_in_write(stream->session->mplx, stream->id, data, len, eos);
+    if (!stream->tmp) {
+        stream->tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
+    }
+    apr_brigade_write(stream->tmp, NULL, NULL, data, len);
     if (eos) {
+        APR_BRIGADE_INSERT_TAIL(stream->tmp, 
+                                apr_bucket_eos_create(c->bucket_alloc)); 
         close_input(stream);
     }
+    
+    status = h2_beam_send(stream->input, stream->tmp, APR_BLOCK_READ);
+    apr_brigade_cleanup(stream->tmp);
     return status;
 }
 
@@ -392,44 +387,122 @@ int h2_stream_is_suspended(const h2_stre
     return stream->suspended;
 }
 
-apr_status_t h2_stream_out_prepare(h2_stream *stream, 
+static apr_status_t fill_buffer(h2_stream *stream, apr_size_t amount)
+{
+    if (!stream->output) {
+        return APR_EOF;
+    }
+    return h2_beam_receive(stream->output, stream->buffer, 
+                           APR_NONBLOCK_READ, amount);
+}
+
+apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response,
+                                    h2_bucket_beam *output)
+{
+    apr_status_t status = APR_SUCCESS;
+    conn_rec *c = stream->session->c;
+    
+    if (!output_open(stream)) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+                      "h2_stream(%ld-%d): output closed", 
+                      stream->session->id, stream->id);
+        return APR_ECONNRESET;
+    }
+    
+    stream->response = response;
+    stream->output = output;
+    stream->buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
+    
+    h2_stream_filter(stream);
+    if (stream->output) {
+        status = fill_buffer(stream, 0);
+    }
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
+                  "h2_stream(%ld-%d): set_response(%d)", 
+                  stream->session->id, stream->id, 
+                  stream->response->http_status);
+    return status;
+}
+
+apr_status_t h2_stream_out_prepare(h2_stream *stream,
                                    apr_off_t *plen, int *peos)
 {
+    conn_rec *c = stream->session->c;
+    apr_status_t status = APR_SUCCESS;
+    apr_off_t requested = (*plen > 0)? *plen : 32*1024;
+
     if (stream->rst_error) {
         *plen = 0;
         *peos = 1;
         return APR_ECONNRESET;
     }
 
-    AP_DEBUG_ASSERT(stream->sos);
-    return stream->sos->prepare(stream->sos, plen, peos);
+    H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_pre");
+    h2_util_bb_avail(stream->buffer, plen, peos);
+    if (!*peos && !*plen) {
+        /* try to get more data */
+        status = fill_buffer(stream, H2MIN(requested, 32*1024));
+        if (APR_STATUS_IS_EOF(status)) {
+            apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
+            APR_BRIGADE_INSERT_TAIL(stream->buffer, eos);
+            status = APR_SUCCESS;
+        }
+        h2_util_bb_avail(stream->buffer, plen, peos);
+    }
+    H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_post");
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
+                  "h2_stream(%ld-%d): prepare, len=%ld eos=%d, trailers=%s",
+                  c->id, stream->id, (long)*plen, *peos,
+                  (stream->response && stream->response->trailers)? 
+                  "yes" : "no");
+    if (!*peos && !*plen && status == APR_SUCCESS) {
+        return APR_EAGAIN;
+    }
+    return status;
 }
 
+
 apr_status_t h2_stream_readx(h2_stream *stream, 
                              h2_io_data_cb *cb, void *ctx,
                              apr_off_t *plen, int *peos)
 {
+    conn_rec *c = stream->session->c;
+    apr_status_t status = APR_SUCCESS;
+
     if (stream->rst_error) {
         return APR_ECONNRESET;
     }
-    if (!stream->sos) {
-        return APR_EGENERAL;
+    status = h2_util_bb_readx(stream->buffer, cb, ctx, plen, peos);
+    if (status == APR_SUCCESS && !*peos && !*plen) {
+        status = APR_EAGAIN;
     }
-    return stream->sos->readx(stream->sos, cb, ctx, plen, peos);
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
+                  "h2_stream(%ld-%d): readx, len=%ld eos=%d",
+                  c->id, stream->id, (long)*plen, *peos);
+    return status;
 }
 
+
 apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, 
                                apr_off_t *plen, int *peos)
 {
+    conn_rec *c = stream->session->c;
+    apr_status_t status = APR_SUCCESS;
+
     if (stream->rst_error) {
         return APR_ECONNRESET;
     }
-    if (!stream->sos) {
-        return APR_EGENERAL;
+    status = h2_append_brigade(bb, stream->buffer, plen, peos);
+    if (status == APR_SUCCESS && !*peos && !*plen) {
+        status = APR_EAGAIN;
     }
-    return stream->sos->read_to(stream->sos, bb, plen, peos);
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
+                  "h2_stream(%ld-%d): read_to, len=%ld eos=%d",
+                  c->id, stream->id, (long)*plen, *peos);
+    return status;
 }
 
+
 int h2_stream_input_is_open(const h2_stream *stream) 
 {
     return input_open(stream);
@@ -474,7 +547,7 @@ apr_status_t h2_stream_submit_pushes(h2_
 
 apr_table_t *h2_stream_get_trailers(h2_stream *stream)
 {
-    return stream->sos? stream->sos->get_trailers(stream->sos) : NULL;
+    return stream->response? stream->response->trailers : NULL;
 }
 
 const h2_priority *h2_stream_get_priority(h2_stream *stream)
@@ -491,147 +564,3 @@ const h2_priority *h2_stream_get_priorit
     return NULL;
 }
 
-/*******************************************************************************
- * h2_sos_mplx
- ******************************************************************************/
-
-typedef struct h2_sos_mplx {
-    h2_mplx *m;
-    apr_bucket_brigade *bb;
-    apr_bucket_brigade *tmp;
-    apr_table_t *trailers;
-    apr_off_t  buffer_size;
-} h2_sos_mplx;
-
-#define H2_SOS_MPLX_OUT(lvl,msos,msg) \
-    do { \
-        if (APLOG_C_IS_LEVEL((msos)->m->c,lvl)) \
-        h2_util_bb_log((msos)->m->c,(msos)->m->id,lvl,msg,(msos)->bb); \
-    } while(0)
-    
-
-static apr_status_t mplx_transfer(h2_sos_mplx *msos, int stream_id, 
-                                  apr_pool_t *pool)
-{
-    apr_status_t status;
-    apr_table_t *trailers = NULL;
-    
-    if (!msos->tmp) {
-        msos->tmp = apr_brigade_create(msos->bb->p, msos->bb->bucket_alloc);
-    }
-    status = h2_mplx_out_get_brigade(msos->m, stream_id, msos->tmp, 
-                                     msos->buffer_size-1, &trailers);
-    if (!APR_BRIGADE_EMPTY(msos->tmp)) {
-        h2_transfer_brigade(msos->bb, msos->tmp, pool);
-    }
-    if (trailers) {
-        msos->trailers = trailers;
-    }
-    return status;
-}
- 
-static apr_status_t h2_sos_mplx_read_to(h2_sos *sos, apr_bucket_brigade *bb, 
-                                        apr_off_t *plen, int *peos)
-{
-    h2_sos_mplx *msos = sos->ctx;
-    apr_status_t status;
-
-    status = h2_append_brigade(bb, msos->bb, plen, peos);
-    if (status == APR_SUCCESS && !*peos && !*plen) {
-        status = APR_EAGAIN;
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, msos->m->c,
-                      "h2_stream(%ld-%d): read_to, len=%ld eos=%d",
-                      msos->m->id, sos->stream->id, (long)*plen, *peos);
-    }
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, msos->m->c,
-                  "h2_stream(%ld-%d): read_to, len=%ld eos=%d",
-                  msos->m->id, sos->stream->id, (long)*plen, *peos);
-    return status;
-}
-
-static apr_status_t h2_sos_mplx_readx(h2_sos *sos, h2_io_data_cb *cb, void *ctx,
-                                      apr_off_t *plen, int *peos)
-{
-    h2_sos_mplx *msos = sos->ctx;
-    apr_status_t status = APR_SUCCESS;
-    
-    status = h2_util_bb_readx(msos->bb, cb, ctx, plen, peos);
-    if (status == APR_SUCCESS && !*peos && !*plen) {
-        status = APR_EAGAIN;
-    }
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, msos->m->c,
-                  "h2_stream(%ld-%d): readx, len=%ld eos=%d",
-                  msos->m->id, sos->stream->id, (long)*plen, *peos);
-    return status;
-}
-
-static apr_status_t h2_sos_mplx_prepare(h2_sos *sos, apr_off_t *plen, int *peos)
-{
-    h2_sos_mplx *msos = sos->ctx;
-    apr_status_t status = APR_SUCCESS;
-    
-    H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx prepare_pre");
-    
-    if (APR_BRIGADE_EMPTY(msos->bb)) {
-        status = mplx_transfer(msos, sos->stream->id, sos->stream->pool);
-    }
-    h2_util_bb_avail(msos->bb, plen, peos);
-    
-    H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx prepare_post");
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, msos->m->c,
-                  "h2_stream(%ld-%d): prepare, len=%ld eos=%d, trailers=%s",
-                  msos->m->id, sos->stream->id, (long)*plen, *peos,
-                  msos->trailers? "yes" : "no");
-    if (!*peos && !*plen) {
-        status = APR_EAGAIN;
-    }
-    
-    return status;
-}
-
-static apr_table_t *h2_sos_mplx_get_trailers(h2_sos *sos)
-{
-    h2_sos_mplx *msos = sos->ctx;
-
-    return msos->trailers;
-}
-
-static apr_status_t h2_sos_mplx_buffer(h2_sos *sos, apr_bucket_brigade *bb) 
-{
-    h2_sos_mplx *msos = sos->ctx;
-    apr_status_t status = APR_SUCCESS;
-
-    if (bb && !APR_BRIGADE_EMPTY(bb)) {
-        H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx set_response_pre");
-        status = mplx_transfer(msos, sos->stream->id, sos->stream->pool);
-        H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx set_response_post");
-    }
-    return status;
-}
-
-static h2_sos *h2_sos_mplx_create(h2_stream *stream, h2_response *response)
-{
-    h2_sos *sos;
-    h2_sos_mplx *msos;
-    
-    msos = apr_pcalloc(stream->pool, sizeof(*msos));
-    msos->m = stream->session->mplx;
-    msos->bb = apr_brigade_create(stream->pool, msos->m->c->bucket_alloc);
-    msos->buffer_size = 32 * 1024;
-    
-    sos = apr_pcalloc(stream->pool, sizeof(*sos));
-    sos->stream = stream;
-    sos->response = response;
-    
-    sos->ctx = msos;
-    sos->buffer = h2_sos_mplx_buffer;
-    sos->prepare = h2_sos_mplx_prepare;
-    sos->readx = h2_sos_mplx_readx;
-    sos->read_to = h2_sos_mplx_read_to;
-    sos->get_trailers = h2_sos_mplx_get_trailers;
-    
-    sos->response = response;
-
-    return sos;
-}
-

Modified: httpd/httpd/trunk/modules/http2/h2_stream.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_stream.h?rev=1739303&r1=1739302&r2=1739303&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_stream.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_stream.h Fri Apr 15 13:50:46 2016
@@ -38,6 +38,7 @@ struct h2_request;
 struct h2_response;
 struct h2_session;
 struct h2_sos;
+struct h2_bucket_beam;
 
 typedef struct h2_stream h2_stream;
 
@@ -48,8 +49,14 @@ struct h2_stream {
     
     apr_pool_t *pool;           /* the memory pool for this stream */
     struct h2_request *request; /* the request made in this stream */
+    struct h2_bucket_beam *input;
+
+    struct h2_response *response;
+    struct h2_bucket_beam *output;
+    apr_bucket_brigade *buffer;
+    apr_bucket_brigade *tmp;
+
     int rst_error;              /* stream error for RST_STREAM */
-    
     unsigned int aborted   : 1; /* was aborted */
     unsigned int suspended : 1; /* DATA sending has been suspended */
     unsigned int scheduled : 1; /* stream has been scheduled */
@@ -57,7 +64,6 @@ struct h2_stream {
     
     apr_off_t input_remaining;  /* remaining bytes on input as advertised via content-length */
 
-    struct h2_sos *sos;         /* stream output source, e.g. to read output from */
     apr_off_t data_frames_sent; /* # of DATA frames sent out for this stream */
 };
 
@@ -75,12 +81,14 @@ h2_stream *h2_stream_open(int id, apr_po
                           int initiated_on, const struct h2_request *req);
 
 /**
- * Destroy any resources held by this stream. Will destroy memory pool
- * if still owned by the stream.
- *
- * @param stream the stream to destroy
+ * Cleanup any resources still held by the stream, called by last bucket.
+ */
+void h2_stream_eos_destroy(h2_stream *stream);
+
+/**
+ * Destroy memory pool if still owned by the stream.
  */
-apr_status_t h2_stream_destroy(h2_stream *stream);
+void h2_stream_destroy(h2_stream *stream);
 
 /**
  * Removes stream from h2_session and destroys it.
@@ -179,7 +187,7 @@ struct h2_response *h2_stream_get_respon
  */
 apr_status_t h2_stream_set_response(h2_stream *stream, 
                                     struct h2_response *response,
-                                    apr_bucket_brigade *bb);
+                                    struct h2_bucket_beam *output);
 
 /**
  * Do a speculative read on the stream output to determine the 



Mime
View raw message