httpd-cvs mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ic...@apache.org
Subject svn commit: r1712570 [12/13] - in /httpd/httpd/branches/2.4-http2-alpha: ./ docs/conf/ docs/manual/mod/ include/ modules/cache/ modules/http2/ modules/ssl/ server/
Date Wed, 04 Nov 2015 15:45:25 GMT
Modified: httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_mplx.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_mplx.c?rev=1712570&r1=1712569&r2=1712570&view=diff
==============================================================================
--- httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_mplx.c (original)
+++ httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_mplx.c Wed Nov  4 15:44:24 2015
@@ -60,10 +60,6 @@ static void h2_mplx_destroy(h2_mplx *m)
 {
     AP_DEBUG_ASSERT(m);
     m->aborted = 1;
-    if (m->q) {
-        h2_tq_destroy(m->q);
-        m->q = NULL;
-    }
     if (m->ready_ios) {
         h2_io_set_destroy(m->ready_ios);
         m->ready_ios = NULL;
@@ -128,10 +124,9 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
         
         m->bucket_alloc = apr_bucket_alloc_create(m->pool);
         
-        m->q = h2_tq_create(m->id, m->pool);
+        m->q = h2_tq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS));
         m->stream_ios = h2_io_set_create(m->pool);
         m->ready_ios = h2_io_set_create(m->pool);
-        m->closed = h2_stream_set_create(m->pool);
         m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
         m->workers = workers;
         
@@ -226,52 +221,18 @@ void h2_mplx_abort(h2_mplx *m)
 }
 
 
-h2_stream *h2_mplx_open_io(h2_mplx *m, int stream_id)
+static void io_destroy(h2_mplx *m, h2_io *io)
 {
-    h2_stream *stream = NULL;
-    apr_status_t status; 
-    h2_io *io;
-
-    if (m->aborted) {
-        return NULL;
-    }
-    status = apr_thread_mutex_lock(m->lock);
-    if (APR_SUCCESS == status) {
-        apr_pool_t *stream_pool = m->spare_pool;
-        
-        if (!stream_pool) {
-            apr_pool_create(&stream_pool, m->pool);
-        }
-        else {
-            m->spare_pool = NULL;
-        }
-        
-        stream = h2_stream_create(stream_id, stream_pool, m);
-        stream->state = H2_STREAM_ST_OPEN;
-        
-        io = h2_io_set_get(m->stream_ios, stream_id);
-        if (!io) {
-            io = h2_io_create(stream_id, stream_pool, m->bucket_alloc);
-            h2_io_set_add(m->stream_ios, io);
-        }
-        status = io? APR_SUCCESS : APR_ENOMEM;
-        apr_thread_mutex_unlock(m->lock);
-    }
-    return stream;
-}
-
-static void stream_destroy(h2_mplx *m, h2_stream *stream, h2_io *io)
-{
-    apr_pool_t *pool = h2_stream_detach_pool(stream);
-    if (pool) {
-        apr_pool_clear(pool);
-        if (m->spare_pool) {
-            apr_pool_destroy(m->spare_pool);
-        }
-        m->spare_pool = pool;
-    }
-    h2_stream_destroy(stream);
     if (io) {
+        apr_pool_t *pool = io->pool;
+        if (pool) {
+            io->pool = NULL;
+            apr_pool_clear(pool);
+            if (m->spare_pool) {
+                apr_pool_destroy(m->spare_pool);
+            }
+            m->spare_pool = pool;
+        }
         /* The pool is cleared/destroyed which also closes all
          * allocated file handles. Give this count back to our
          * file handle pool. */
@@ -282,31 +243,36 @@ static void stream_destroy(h2_mplx *m, h
     }
 }
 
-apr_status_t h2_mplx_cleanup_stream(h2_mplx *m, h2_stream *stream)
+apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error)
 {
     apr_status_t status;
+    
     AP_DEBUG_ASSERT(m);
+    if (m->aborted) {
+        return APR_ECONNABORTED;
+    }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream->id);
+        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+        
         if (io) {
             /* Remove io from ready set, we will never submit it */
             h2_io_set_remove(m->ready_ios, io);
-            if (stream->rst_error) {
-                /* Forward error code to fail any further attempt to
-                 * write to io */
-                h2_io_rst(io, stream->rst_error);
+            
+            if (io->task_done) {
+                io_destroy(m, io);
+            }
+            else {
+                /* cleanup once task is done */
+                io->zombie = 1;
+                if (rst_error) {
+                    /* Forward error code to fail any further attempt to
+                     * write to io */
+                    h2_io_rst(io, rst_error);
+                }
             }
         }
         
-        if (!io || io->task_done) {
-            /* No more io or task already done -> cleanup immediately */
-            stream_destroy(m, stream, io);
-        }
-        else {
-            /* Add stream to closed set for cleanup when task is done */
-            h2_stream_set_add(m->closed, stream);
-        }
         apr_thread_mutex_unlock(m->lock);
     }
     return status;
@@ -316,21 +282,17 @@ void h2_mplx_task_done(h2_mplx *m, int s
 {
     apr_status_t status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        h2_stream *stream = h2_stream_set_get(m->closed, stream_id);
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
                       "h2_mplx(%ld): task(%d) done", m->id, stream_id);
-        if (stream) {
-            /* stream was already closed by main connection and is in 
-             * zombie state. Now that the task is done with it, we
-             * can free its resources. */
-            h2_stream_set_remove(m->closed, stream);
-            stream_destroy(m, stream, io);
-        }
-        else if (io) {
-            /* main connection has not finished stream. Mark task as done
-             * so that eventual cleanup can start immediately. */
+        if (io) {
             io->task_done = 1;
+            if (io->zombie) {
+                io_destroy(m, io);
+            }
+            else {
+                /* hang around until the stream deregisteres */
+            }
         }
         apr_thread_mutex_unlock(m->lock);
     }
@@ -497,6 +459,36 @@ apr_status_t h2_mplx_out_readx(h2_mplx *
     return status;
 }
 
+apr_status_t h2_mplx_out_read_to(h2_mplx *m, int stream_id, 
+                                 apr_bucket_brigade *bb, 
+                                 apr_size_t *plen, int *peos)
+{
+    apr_status_t status;
+    AP_DEBUG_ASSERT(m);
+    if (m->aborted) {
+        return APR_ECONNABORTED;
+    }
+    status = apr_thread_mutex_lock(m->lock);
+    if (APR_SUCCESS == status) {
+        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+        if (io) {
+            H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_read_to_pre");
+            
+            status = h2_io_out_read_to(io, bb, plen, peos);
+            
+            H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_read_to_post");
+            if (status == APR_SUCCESS && io->output_drained) {
+                apr_thread_cond_signal(io->output_drained);
+            }
+        }
+        else {
+            status = APR_ECONNABORTED;
+        }
+        apr_thread_mutex_unlock(m->lock);
+    }
+    return status;
+}
+
 h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_stream_set *streams)
 {
     apr_status_t status;
@@ -529,7 +521,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *
                               "h2_mplx(%ld): stream for response %d closed, "
                               "resetting io to close request processing",
                               m->id, io->id);
-                h2_io_rst(io, NGHTTP2_ERR_STREAM_CLOSED);
+                h2_io_rst(io, H2_ERR_STREAM_CLOSED);
             }
             
             if (io->output_drained) {
@@ -588,12 +580,12 @@ static apr_status_t out_open(h2_mplx *m,
     if (io) {
         if (f) {
             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c,
-                          "h2_mplx(%ld-%d): open response: %s",
-                          m->id, stream_id, response->status);
+                          "h2_mplx(%ld-%d): open response: %s, rst=%d",
+                          m->id, stream_id, response->status, 
+                          response->rst_error);
         }
         
-        io->response = h2_response_copy(io->pool, response);
-        AP_DEBUG_ASSERT(io->response);
+        h2_io_set_response(io, response);
         h2_io_set_add(m->ready_ios, io);
         if (bb) {
             status = out_write(m, io, f, bb, iowait);
@@ -681,7 +673,7 @@ apr_status_t h2_mplx_out_close(h2_mplx *
                      * insert an error one so that our streams can properly
                      * reset.
                      */
-                    h2_response *r = h2_response_create(stream_id, 
+                    h2_response *r = h2_response_create(stream_id, 0, 
                                                         "500", NULL, m->pool);
                     status = out_open(m, stream_id, r, NULL, NULL, NULL);
                     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
@@ -727,11 +719,8 @@ apr_status_t h2_mplx_out_rst(h2_mplx *m,
                 H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_rst");
                 
                 have_out_data_for(m, stream_id);
-                if (m->aborted) {
-                    /* if we were the last output, the whole session might
-                     * have gone down in the meantime.
-                     */
-                    return APR_SUCCESS;
+                if (io->output_drained) {
+                    apr_thread_cond_signal(io->output_drained);
                 }
             }
             else {
@@ -813,61 +802,116 @@ static void have_out_data_for(h2_mplx *m
     }
 }
 
-apr_status_t h2_mplx_do_task(h2_mplx *m, struct h2_task *task)
+typedef struct {
+    h2_stream_pri_cmp *cmp;
+    void *ctx;
+} cmp_ctx;
+
+static int task_cmp(h2_task *t1, h2_task *t2, void *ctx)
+{
+    cmp_ctx *x = ctx;
+    return x->cmp(t1->stream_id, t2->stream_id, x->ctx);
+}
+
+apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
 {
     apr_status_t status;
+    
     AP_DEBUG_ASSERT(m);
     if (m->aborted) {
         return APR_ECONNABORTED;
     }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        /* TODO: needs to sort queue by priority */
+        cmp_ctx x;
+        
+        x.cmp = cmp;
+        x.ctx = ctx;
+        h2_tq_sort(m->q, task_cmp, &x);
+        
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                      "h2_mplx: do task(%s)", task->id);
-        h2_tq_append(m->q, task);
+                      "h2_mplx(%ld): reprioritize tasks", m->id);
         apr_thread_mutex_unlock(m->lock);
     }
     workers_register(m);
     return status;
 }
 
-h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
+static h2_io *open_io(h2_mplx *m, int stream_id)
+{
+    apr_pool_t *io_pool = m->spare_pool;
+    h2_io *io;
+    
+    if (!io_pool) {
+        apr_pool_create(&io_pool, m->pool);
+    }
+    else {
+        m->spare_pool = NULL;
+    }
+    
+    io = h2_io_create(stream_id, io_pool, m->bucket_alloc);
+    h2_io_set_add(m->stream_ios, io);
+    
+    return io;
+}
+
+
+apr_status_t h2_mplx_process(h2_mplx *m, int stream_id,
+                             struct h2_request *r, int eos, 
+                             h2_stream_pri_cmp *cmp, void *ctx)
 {
-    h2_task *task = NULL;
     apr_status_t status;
+    
     AP_DEBUG_ASSERT(m);
     if (m->aborted) {
-        *has_more = 0;
-        return NULL;
+        return APR_ECONNABORTED;
     }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        task = h2_tq_pop_first(m->q);
-        if (task) {
-            h2_task_set_started(task);
+        conn_rec *c;
+        h2_io *io;
+        cmp_ctx x;
+        
+        io = open_io(m, stream_id);
+        c = h2_conn_create(m->c, io->pool);
+        io->task = h2_task_create(m->id, stream_id, io->pool, m, c);
+            
+        status = h2_request_end_headers(r, m, io->task, eos);
+        if (status == APR_SUCCESS && eos) {
+            status = h2_io_in_close(io);
         }
-        *has_more = !h2_tq_empty(m->q);
+        
+        if (status == APR_SUCCESS) {
+            x.cmp = cmp;
+            x.ctx = ctx;
+            h2_tq_add(m->q, io->task, task_cmp, &x);
+        }
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+                      "h2_mplx(%ld-%d): process", m->c->id, stream_id);
         apr_thread_mutex_unlock(m->lock);
     }
-    return task;
+    
+    if (status == APR_SUCCESS) {
+        workers_register(m);
+    }
+    return status;
 }
 
-apr_status_t h2_mplx_create_task(h2_mplx *m, struct h2_stream *stream)
+h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
 {
+    h2_task *task = NULL;
     apr_status_t status;
     AP_DEBUG_ASSERT(m);
     if (m->aborted) {
-        return APR_ECONNABORTED;
+        *has_more = 0;
+        return NULL;
     }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        conn_rec *c = h2_conn_create(m->c, stream->pool);
-        stream->task = h2_task_create(m->id, stream->id, 
-                                      stream->pool, m, c);
-        
+        task = h2_tq_shift(m->q);
+        *has_more = !h2_tq_empty(m->q);
         apr_thread_mutex_unlock(m->lock);
     }
-    return status;
+    return task;
 }
 

Modified: httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_mplx.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_mplx.h?rev=1712570&r1=1712569&r2=1712570&view=diff
==============================================================================
--- httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_mplx.h (original)
+++ httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_mplx.h Wed Nov  4 15:44:24 2015
@@ -41,6 +41,7 @@ struct h2_config;
 struct h2_response;
 struct h2_task;
 struct h2_stream;
+struct h2_request;
 struct h2_io_set;
 struct apr_thread_cond_t;
 struct h2_workers;
@@ -70,12 +71,13 @@ struct h2_mplx {
     int aborted;
     apr_size_t stream_max_mem;
     
-    apr_pool_t *spare_pool;           /* spare pool, ready for next stream */
-    struct h2_stream_set *closed;     /* streams closed, but task ongoing */
+    apr_pool_t *spare_pool;           /* spare pool, ready for next io */
     struct h2_workers *workers;
     int file_handles_allowed;
 };
 
+
+
 /*******************************************************************************
  * Object lifecycle and information.
  ******************************************************************************/
@@ -118,15 +120,16 @@ void h2_mplx_task_done(h2_mplx *m, int s
 /*******************************************************************************
  * IO lifetime of streams.
  ******************************************************************************/
-/**
- * Prepares the multiplexer to handle in-/output on the given stream id.
- */
-struct h2_stream *h2_mplx_open_io(h2_mplx *mplx, int stream_id);
 
 /**
- * Ends cleanup of a stream in sync with execution thread.
+ * Notifies mplx that a stream has finished processing.
+ * 
+ * @param m the mplx itself
+ * @param stream_id the id of the stream being done
+ * @param rst_error if != 0, the stream was reset with the error given
+ *
  */
-apr_status_t h2_mplx_cleanup_stream(h2_mplx *m, struct h2_stream *stream);
+apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error);
 
 /* Return != 0 iff the multiplexer has data for the given stream. 
  */
@@ -144,14 +147,30 @@ apr_status_t h2_mplx_out_trywait(h2_mplx
  ******************************************************************************/
 
 /**
- * Perform the task on the given stream.
+ * Process a stream request.
+ * 
+ * @param m the multiplexer
+ * @param stream_id the identifier of the stream
+ * @param r the request to be processed
+ * @param eos if input is complete
+ * @param cmp the stream priority compare function
+ * @param ctx context data for the compare function
+ */
+apr_status_t h2_mplx_process(h2_mplx *m, int stream_id,
+                             struct h2_request *r, int eos, 
+                             h2_stream_pri_cmp *cmp, void *ctx);
+
+/**
+ * Stream priorities have changed, reschedule pending tasks.
+ * 
+ * @param m the multiplexer
+ * @param cmp the stream priority compare function
+ * @param ctx context data for the compare function
  */
-apr_status_t h2_mplx_do_task(h2_mplx *mplx, struct h2_task *task);
+apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx);
 
 struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, int *has_more);
 
-apr_status_t h2_mplx_create_task(h2_mplx *mplx, struct h2_stream *stream);
-
 /*******************************************************************************
  * Input handling of streams.
  ******************************************************************************/
@@ -223,6 +242,14 @@ apr_status_t h2_mplx_out_readx(h2_mplx *
                                apr_size_t *plen, int *peos);
 
 /**
+ * Reads output data into the given brigade. Will never block, but
+ * return APR_EAGAIN until data arrives or the stream is closed.
+ */
+apr_status_t h2_mplx_out_read_to(h2_mplx *mplx, int stream_id, 
+                                 apr_bucket_brigade *bb, 
+                                 apr_size_t *plen, int *peos);
+
+/**
  * Opens the output for the given stream with the specified response.
  */
 apr_status_t h2_mplx_out_open(h2_mplx *mplx, int stream_id,

Modified: httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_request.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_request.c?rev=1712570&r1=1712569&r2=1712570&view=diff
==============================================================================
--- httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_request.c (original)
+++ httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_request.c Wed Nov  4 15:44:24 2015
@@ -151,13 +151,21 @@ apr_status_t h2_request_write_data(h2_re
 apr_status_t h2_request_end_headers(h2_request *req, struct h2_mplx *m,
                                     h2_task *task, int eos)
 {
+    apr_status_t status;
+    
     if (!req->to_h1) {
-        apr_status_t status = insert_request_line(req, m);
+        status = insert_request_line(req, m);
         if (status != APR_SUCCESS) {
             return status;
         }
     }
-    return h2_to_h1_end_headers(req->to_h1, task, eos);
+    status = h2_to_h1_end_headers(req->to_h1, eos);
+    h2_task_set_request(task, req->to_h1->method, 
+                        req->to_h1->scheme, 
+                        req->to_h1->authority, 
+                        req->to_h1->path, 
+                        req->to_h1->headers, eos);
+    return status;
 }
 
 apr_status_t h2_request_close(h2_request *req)

Modified: httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_response.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_response.c?rev=1712570&r1=1712569&r2=1712570&view=diff
==============================================================================
--- httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_response.c (original)
+++ httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_response.c Wed Nov  4 15:44:24 2015
@@ -25,6 +25,7 @@
 #include <nghttp2/nghttp2.h>
 
 #include "h2_private.h"
+#include "h2_h2.h"
 #include "h2_util.h"
 #include "h2_response.h"
 
@@ -41,6 +42,7 @@ static int ignore_header(const char *nam
 }
 
 h2_response *h2_response_create(int stream_id,
+                                int rst_error,
                                 const char *http_status,
                                 apr_array_header_t *hlines,
                                 apr_pool_t *pool)
@@ -53,7 +55,8 @@ h2_response *h2_response_create(int stre
     }
     
     response->stream_id = stream_id;
-    response->status = http_status;
+    response->rst_error = rst_error;
+    response->status = http_status? http_status : "500";
     response->content_length = -1;
     
     if (hlines) {
@@ -112,6 +115,19 @@ h2_response *h2_response_rcreate(int str
     response->status = apr_psprintf(pool, "%d", r->status);
     response->content_length = -1;
     response->rheader = header;
+
+    if (r->status == HTTP_FORBIDDEN) {
+        const char *cause = apr_table_get(r->notes, "ssl-renegotiate-forbidden");
+        if (cause) {
+            /* This request triggered a TLS renegotiation that is now allowed 
+             * in HTTP/2. Tell the client that it should use HTTP/1.1 for this.
+             */
+            ap_log_rerror(APLOG_MARK, APLOG_DEBUG, r->status, r, 
+                          "h2_response(%ld-%d): renegotiate forbidden, cause: %s",
+                          (long)r->connection->id, stream_id, cause);
+            response->rst_error = H2_ERR_HTTP_1_1_REQUIRED;
+        }
+    }
     
     return response;
 }

Modified: httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_response.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_response.h?rev=1712570&r1=1712569&r2=1712570&view=diff
==============================================================================
--- httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_response.h (original)
+++ httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_response.h Wed Nov  4 15:44:24 2015
@@ -26,6 +26,7 @@ typedef struct h2_ngheader {
 
 typedef struct h2_response {
     int stream_id;
+    int rst_error;
     const char *status;
     apr_off_t content_length;
     apr_table_t *rheader;
@@ -33,9 +34,10 @@ typedef struct h2_response {
 } h2_response;
 
 h2_response *h2_response_create(int stream_id,
-                                  const char *http_status,
-                                  apr_array_header_t *hlines,
-                                  apr_pool_t *pool);
+                                int rst_error,
+                                const char *http_status,
+                                apr_array_header_t *hlines,
+                                apr_pool_t *pool);
 
 h2_response *h2_response_rcreate(int stream_id, request_rec *r,
                                  apr_table_t *header, apr_pool_t *pool);

Modified: httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_session.c?rev=1712570&r1=1712569&r2=1712570&view=diff
==============================================================================
--- httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_session.c (original)
+++ httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_session.c Wed Nov  4 15:44:24 2015
@@ -24,6 +24,8 @@
 #include <http_log.h>
 
 #include "h2_private.h"
+#include "h2_bucket_eoc.h"
+#include "h2_bucket_eos.h"
 #include "h2_config.h"
 #include "h2_h2.h"
 #include "h2_mplx.h"
@@ -56,41 +58,94 @@ static int h2_session_status_from_apr_st
 static int stream_open(h2_session *session, int stream_id)
 {
     h2_stream * stream;
+    apr_pool_t *stream_pool;
     if (session->aborted) {
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
     
-    stream = h2_mplx_open_io(session->mplx, stream_id);
-    if (stream) {
-        h2_stream_set_add(session->streams, stream);
-        if (stream->id > session->max_stream_received) {
-            session->max_stream_received = stream->id;
-        }
-        
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
-                      "h2_session: stream(%ld-%d): opened",
-                      session->id, stream_id);
-        
-        return 0;
+    if (session->spare) {
+        stream_pool = session->spare;
+        session->spare = NULL;
+    }
+    else {
+        apr_pool_create(&stream_pool, session->pool);
+    }
+    
+    stream = h2_stream_create(stream_id, stream_pool, session);
+    stream->state = H2_STREAM_ST_OPEN;
+    
+    h2_stream_set_add(session->streams, stream);
+    if (stream->id > session->max_stream_received) {
+        session->max_stream_received = stream->id;
     }
     
-    ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, session->c,
-                  APLOGNO(02918) 
-                  "h2_session: stream(%ld-%d): unable to create",
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+                  "h2_session: stream(%ld-%d): opened",
                   session->id, stream_id);
-    return NGHTTP2_ERR_INVALID_STREAM_ID;
+    
+    return 0;
+}
+
+/**
+ * Determine the importance of streams when scheduling tasks.
+ * - if both stream depend on the same one, compare weights
+ * - if one stream is closer to the root, prioritize that one
+ * - if both are on the same level, use the weight of their root
+ *   level ancestors
+ */
+static int spri_cmp(int sid1, nghttp2_stream *s1, 
+                    int sid2, nghttp2_stream *s2, h2_session *session)
+{
+    nghttp2_stream *p1, *p2;
+    
+    p1 = nghttp2_stream_get_parent(s1);
+    p2 = nghttp2_stream_get_parent(s2);
+    
+    if (p1 == p2) {
+        int32_t w1, w2;
+        
+        w1 = nghttp2_stream_get_weight(s1);
+        w2 = nghttp2_stream_get_weight(s2);
+        return w2 - w1;
+    }
+    else if (!p1) {
+        /* stream 1 closer to root */
+        return -1;
+    }
+    else if (!p2) {
+        /* stream 2 closer to root */
+        return 1;
+    }
+    return spri_cmp(sid1, p1, sid2, p2, session);
+}
+
+static int stream_pri_cmp(int sid1, int sid2, void *ctx)
+{
+    h2_session *session = ctx;
+    nghttp2_stream *s1, *s2;
+    
+    s1 = nghttp2_session_find_stream(session->ngh2, sid1);
+    s2 = nghttp2_session_find_stream(session->ngh2, sid2);
+
+    if (s1 == s2) {
+        return 0;
+    }
+    else if (!s1) {
+        return 1;
+    }
+    else if (!s2) {
+        return -1;
+    }
+    return spri_cmp(sid1, s1, sid2, s2, session);
 }
 
 static apr_status_t stream_end_headers(h2_session *session,
                                        h2_stream *stream, int eos)
 {
     (void)session;
-    return h2_stream_write_eoh(stream, eos);
+    return h2_stream_schedule(stream, eos, stream_pri_cmp, session);
 }
 
-static apr_status_t send_data(h2_session *session, const char *data, 
-                              apr_size_t length);
-
 /*
  * Callback when nghttp2 wants to send bytes back to the client.
  */
@@ -99,10 +154,11 @@ static ssize_t send_cb(nghttp2_session *
                        int flags, void *userp)
 {
     h2_session *session = (h2_session *)userp;
-    apr_status_t status = send_data(session, (const char *)data, length);
+    apr_status_t status;
     
     (void)ngh2;
     (void)flags;
+    status = h2_conn_io_write(&session->io, (const char *)data, length);
     if (status == APR_SUCCESS) {
         return length;
     }
@@ -168,7 +224,7 @@ static int on_data_chunk_recv_cb(nghttp2
                   session->id, stream_id, (int)len);
     if (status != APR_SUCCESS) {
         rv = nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream_id,
-                                       H2_STREAM_RST(stream, NGHTTP2_INTERNAL_ERROR));
+                                       H2_STREAM_RST(stream, H2_ERR_INTERNAL_ERROR));
         if (nghttp2_is_fatal(rv)) {
             return NGHTTP2_ERR_CALLBACK_FAILURE;
         }
@@ -186,6 +242,20 @@ static int before_frame_send_cb(nghttp2_
     if (session->aborted) {
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
+    /* Set the need to flush output when we have added one of the 
+     * following frame types */
+    switch (frame->hd.type) {
+        case NGHTTP2_RST_STREAM:
+        case NGHTTP2_WINDOW_UPDATE:
+        case NGHTTP2_PUSH_PROMISE:
+        case NGHTTP2_PING:
+        case NGHTTP2_GOAWAY:
+            session->flush = 1;
+            break;
+        default:
+            break;
+
+    }
     if (APLOGctrace2(session->c)) {
         char buffer[256];
         frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
@@ -201,9 +271,14 @@ static int on_frame_send_cb(nghttp2_sess
                             void *userp)
 {
     h2_session *session = (h2_session *)userp;
-    (void)ngh2; (void)frame;
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
-                  "h2_session(%ld): on_frame_send", session->id);
+    (void)ngh2;
+    if (APLOGctrace2(session->c)) {
+        char buffer[256];
+        frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+                      "h2_session(%ld): on_frame_send %s", 
+                      session->id, buffer);
+    }
     return 0;
 }
 
@@ -241,12 +316,13 @@ static apr_status_t stream_destroy(h2_se
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
                       "h2_stream(%ld-%d): closing with err=%d %s", 
                       session->id, (int)stream->id, (int)error_code,
-                      nghttp2_strerror(error_code));
+                      h2_h2_err_description(error_code));
         h2_stream_rst(stream, error_code);
     }
     
-    h2_stream_set_remove(session->streams, stream);
-    return h2_mplx_cleanup_stream(session->mplx, stream);
+    return h2_conn_io_writeb(&session->io,
+                             h2_bucket_eos_create(session->c->bucket_alloc, 
+                                                  stream));
 }
 
 static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
@@ -387,6 +463,7 @@ static int on_frame_recv_cb(nghttp2_sess
             break;
         }
         case NGHTTP2_PRIORITY: {
+            session->reprioritize = 1;
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
                           "h2_session:  stream(%ld-%d): PRIORITY frame "
                           " weight=%d, dependsOn=%d, exclusive=%d", 
@@ -440,18 +517,15 @@ static int on_frame_recv_cb(nghttp2_sess
     return 0;
 }
 
-static apr_status_t send_data(h2_session *session, const char *data, 
-                              apr_size_t length)
-{
-    return h2_conn_io_write(&session->io, data, length);
-}
-
 static apr_status_t pass_data(void *ctx, 
                               const char *data, apr_size_t length)
 {
-    return send_data((h2_session*)ctx, data, length);
+    return h2_conn_io_write(&((h2_session*)ctx)->io, data, length);
 }
 
+
+static char immortal_zeros[H2_MAX_PADLEN];
+
 static int on_send_data_cb(nghttp2_session *ngh2, 
                            nghttp2_frame *frame, 
                            const uint8_t *framehd, 
@@ -481,31 +555,64 @@ static int on_send_data_cb(nghttp2_sessi
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
     
-    status = send_data(session, (const char *)framehd, 9);
-    if (status == APR_SUCCESS) {
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+                  "h2_stream(%ld-%d): send_data_cb for %ld bytes",
+                  session->id, (int)stream_id, (long)length);
+                  
+    if (h2_conn_io_is_buffered(&session->io)) {
+        status = h2_conn_io_write(&session->io, (const char *)framehd, 9);
+        if (status == APR_SUCCESS) {
+            if (padlen) {
+                status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
+            }
+            
+            if (status == APR_SUCCESS) {
+                apr_size_t len = length;
+                status = h2_stream_readx(stream, pass_data, session, 
+                                         &len, &eos);
+                if (status == APR_SUCCESS && len != length) {
+                    status = APR_EINVAL;
+                }
+            }
+            
+            if (status == APR_SUCCESS && padlen) {
+                if (padlen) {
+                    status = h2_conn_io_write(&session->io, immortal_zeros, padlen);
+                }
+            }
+        }
+    }
+    else {
+        apr_bucket *b;
+        char *header = apr_pcalloc(stream->pool, 10);
+        memcpy(header, (const char *)framehd, 9);
         if (padlen) {
-            status = send_data(session, (const char *)&padlen, 1);
+            header[9] = (char)padlen;
         }
-
+        b = apr_bucket_pool_create(header, padlen? 10 : 9, 
+                                   stream->pool, session->c->bucket_alloc);
+        status = h2_conn_io_writeb(&session->io, b);
+        
         if (status == APR_SUCCESS) {
             apr_size_t len = length;
-            status = h2_stream_readx(stream, pass_data, session, 
-                                     &len, &eos);
+            status = h2_stream_read_to(stream, session->io.output, &len, &eos);
+            session->io.unflushed = 1;
             if (status == APR_SUCCESS && len != length) {
                 status = APR_EINVAL;
             }
         }
-        
+            
         if (status == APR_SUCCESS && padlen) {
-            if (padlen) {
-                char pad[256];
-                memset(pad, 0, padlen);
-                status = send_data(session, pad, padlen);
-            }
+            b = apr_bucket_immortal_create(immortal_zeros, padlen, 
+                                           session->c->bucket_alloc);
+            status = h2_conn_io_writeb(&session->io, b);
         }
     }
     
+    
     if (status == APR_SUCCESS) {
+        stream->data_frames_sent++;
+        h2_conn_io_consider_flush(&session->io);
         return 0;
     }
     else {
@@ -518,6 +625,19 @@ static int on_send_data_cb(nghttp2_sessi
     return h2_session_status_from_apr_status(status);
 }
 
+static ssize_t on_data_source_read_length_cb(nghttp2_session *session, 
+                                             uint8_t frame_type, int32_t stream_id, 
+                                             int32_t session_remote_window_size, 
+                                             int32_t stream_remote_window_size, 
+                                             uint32_t remote_max_frame_size, 
+                                             void *user_data)
+{
+    /* DATA frames add 9 bytes header plus 1 byte for padlen and additional 
+     * padlen bytes. Keep below TLS maximum record size.
+     * TODO: respect pad bytes when we have that feature.
+     */
+    return (16*1024 - 10);
+}
 
 #define NGH2_SET_CALLBACK(callbacks, name, fn)\
 nghttp2_session_callbacks_set_##name##_callback(callbacks, fn)
@@ -543,6 +663,7 @@ static apr_status_t init_callbacks(conn_
     NGH2_SET_CALLBACK(*pcb, on_begin_headers, on_begin_headers_cb);
     NGH2_SET_CALLBACK(*pcb, on_header, on_header_cb);
     NGH2_SET_CALLBACK(*pcb, send_data, on_send_data_cb);
+    NGH2_SET_CALLBACK(*pcb, data_source_read_length, on_data_source_read_length_cb);
     
     return APR_SUCCESS;
 }
@@ -660,13 +781,11 @@ void h2_session_destroy(h2_session *sess
         nghttp2_session_del(session->ngh2);
         session->ngh2 = NULL;
     }
-    h2_conn_io_destroy(&session->io);
-    
-    if (session->iowait) {
-        apr_thread_cond_destroy(session->iowait);
-        session->iowait = NULL;
-    }
-    
+}
+
+void h2_session_cleanup(h2_session *session)
+{
+    h2_session_destroy(session);
     if (session->pool) {
         apr_pool_destroy(session->pool);
     }
@@ -677,14 +796,18 @@ static apr_status_t h2_session_abort_int
     AP_DEBUG_ASSERT(session);
     if (!session->aborted) {
         session->aborted = 1;
-        if (session->ngh2) {
-            
-            if (!reason) {
+        
+        if (session->ngh2) {            
+            if (NGHTTP2_ERR_EOF == reason) {
+                /* This is our way of indication that the connection is
+                 * gone. No use to send any GOAWAY frames. */
+                nghttp2_session_terminate_session(session->ngh2, reason);
+            }
+            else if (!reason) {
                 nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 
                                       session->max_stream_received, 
                                       reason, NULL, 0);
                 nghttp2_session_send(session->ngh2);
-                h2_conn_io_flush(&session->io);
             }
             else {
                 const char *err = nghttp2_strerror(reason);
@@ -693,22 +816,15 @@ static apr_status_t h2_session_abort_int
                               "session(%ld): aborting session, reason=%d %s",
                               session->id, reason, err);
                 
-                if (NGHTTP2_ERR_EOF == reason) {
-                    /* This is our way of indication that the connection is
-                     * gone. No use to send any GOAWAY frames. */
-                    nghttp2_session_terminate_session(session->ngh2, reason);
-                }
-                else {
-                    /* The connection might still be there and we shut down
-                     * with GOAWAY and reason information. */
-                     nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 
-                                           session->max_stream_received, 
-                                           reason, (const uint8_t *)err, 
-                                           strlen(err));
-                     nghttp2_session_send(session->ngh2);
-                     h2_conn_io_flush(&session->io);
-                }
+                /* The connection might still be there and we shut down
+                 * with GOAWAY and reason information. */
+                nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 
+                                      session->max_stream_received, 
+                                      reason, (const uint8_t *)err, 
+                                      strlen(err));
+                nghttp2_session_send(session->ngh2);
             }
+            h2_conn_io_flush(&session->io);
         }
         h2_mplx_abort(session->mplx);
     }
@@ -854,7 +970,7 @@ static int resume_on_data(void *ctx, h2_
     AP_DEBUG_ASSERT(stream);
     
     if (h2_stream_is_suspended(stream)) {
-        if (h2_mplx_out_has_data_for(stream->m, stream->id)) {
+        if (h2_mplx_out_has_data_for(stream->session->mplx, stream->id)) {
             int rv;
             h2_stream_set_suspended(stream, 0);
             ++rctx->resume_count;
@@ -893,6 +1009,12 @@ static void update_window(void *ctx, int
     nghttp2_session_consume(session->ngh2, stream_id, bytes_read);
 }
 
+static apr_status_t h2_session_flush(h2_session *session) 
+{
+    session->flush = 0;
+    return h2_conn_io_flush(&session->io);
+}
+
 static apr_status_t h2_session_update_windows(h2_session *session)
 {
     return h2_mplx_in_update_windows(session->mplx, update_window, session);
@@ -902,16 +1024,17 @@ apr_status_t h2_session_write(h2_session
 {
     apr_status_t status = APR_EAGAIN;
     h2_stream *stream = NULL;
-    int flush_output = 0;
     
     AP_DEBUG_ASSERT(session);
     
+    if (session->reprioritize) {
+        h2_mplx_reprioritize(session->mplx, stream_pri_cmp, session);
+        session->reprioritize = 0;
+    }
+    
     /* Check that any pending window updates are sent. */
     status = h2_session_update_windows(session);
-    if (status == APR_SUCCESS) {
-        flush_output = 1;
-    }
-    else if (!APR_STATUS_IS_EAGAIN(status)) {
+    if (status != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(status)) {
         return status;
     }
     
@@ -927,27 +1050,24 @@ apr_status_t h2_session_write(h2_session
                 status = APR_ECONNABORTED;
             }
         }
-        flush_output = 1;
     }
     
     /* If we have responses ready, submit them now. */
     while (!session->aborted 
            && (stream = h2_mplx_next_submit(session->mplx, session->streams)) != NULL) {
         status = h2_session_handle_response(session, stream);
-        flush_output = 1;
     }
     
     if (!session->aborted && h2_session_resume_streams_with_data(session) > 0) {
-        flush_output = 1;
     }
     
-    if (!session->aborted && !flush_output 
-        && timeout > 0 && !h2_session_want_write(session)) {
+    if (!session->aborted && !session->flush && timeout > 0 
+        && !h2_session_want_write(session)) {
+        h2_session_flush(session);
         status = h2_mplx_out_trywait(session->mplx, timeout, session->iowait);
 
         if (status != APR_TIMEUP
             && h2_session_resume_streams_with_data(session) > 0) {
-            flush_output = 1;
         }
         else {
             /* nothing happened to ongoing streams, do some house-keeping */
@@ -966,11 +1086,10 @@ apr_status_t h2_session_write(h2_session
                 status = APR_ECONNABORTED;
             }
         }
-        flush_output = 1;
     }
     
-    if (flush_output) {
-        h2_conn_io_flush(&session->io);
+    if (session->flush) {
+        h2_session_flush(session);
     }
     
     return status;
@@ -1015,17 +1134,28 @@ static apr_status_t session_receive(cons
 apr_status_t h2_session_read(h2_session *session, apr_read_type_e block)
 {
     AP_DEBUG_ASSERT(session);
+    if (block == APR_BLOCK_READ) {
+        /* before we do a blocking read, make sure that all our output
+         * is send out. Otherwise we might deadlock. */
+        h2_session_flush(session);
+    }
     return h2_conn_io_read(&session->io, block, session_receive, session);
 }
 
 apr_status_t h2_session_close(h2_session *session)
 {
     AP_DEBUG_ASSERT(session);
-    return session->aborted? APR_SUCCESS : h2_conn_io_flush(&session->io);
+    if (!session->aborted) {
+        h2_session_abort_int(session, 0);
+    }
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0,session->c,
+                  "h2_session: closing, writing eoc");
+    h2_conn_io_writeb(&session->io,
+                      h2_bucket_eoc_create(session->c->bucket_alloc, 
+                                           session));
+    return h2_conn_io_flush(&session->io);
 }
 
-/* The session wants to send more DATA for the given stream.
- */
 static ssize_t stream_data_cb(nghttp2_session *ng2s,
                               int32_t stream_id,
                               uint8_t *buf,
@@ -1041,12 +1171,21 @@ static ssize_t stream_data_cb(nghttp2_se
     h2_stream *stream;
     AP_DEBUG_ASSERT(session);
     
+    /* The session wants to send more DATA for the stream. We need
+     * to find out how much of the requested length we can send without
+     * blocking.
+     * Indicate EOS when we encounter it or DEFERRED if the stream
+     * should be suspended.
+     * TODO: for handling of TRAILERS,  the EOF indication needs
+     * to be aware of that.
+     */
+ 
     (void)ng2s;
     (void)buf;
     (void)source;
     stream = h2_stream_set_get(session->streams, stream_id);
     if (!stream) {
-        ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_NOTFOUND, session->c,
+        ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
                       APLOGNO(02937) 
                       "h2_stream(%ld-%d): data requested but stream not found",
                       session->id, (int)stream_id);
@@ -1155,7 +1294,7 @@ apr_status_t h2_session_handle_response(
     else {
         rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
                                        stream->id, 
-                                       H2_STREAM_RST(stream, NGHTTP2_PROTOCOL_ERROR));
+                                       H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR));
     }
     
     if (nghttp2_is_fatal(rv)) {
@@ -1168,6 +1307,24 @@ apr_status_t h2_session_handle_response(
     return status;
 }
 
+apr_status_t h2_session_stream_destroy(h2_session *session, h2_stream *stream)
+{
+    apr_pool_t *pool = h2_stream_detach_pool(stream);
+
+    h2_mplx_stream_done(session->mplx, stream->id, stream->rst_error);
+    h2_stream_set_remove(session->streams, stream->id);
+    h2_stream_destroy(stream);
+    
+    if (pool) {
+        apr_pool_clear(pool);
+        if (session->spare) {
+            apr_pool_destroy(session->spare);
+        }
+        session->spare = pool;
+    }
+    return APR_SUCCESS;
+}
+
 int h2_session_is_done(h2_session *session)
 {
     AP_DEBUG_ASSERT(session);

Modified: httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_session.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_session.h?rev=1712570&r1=1712569&r2=1712570&view=diff
==============================================================================
--- httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_session.h (original)
+++ httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_session.h Wed Nov  4 15:44:24 2015
@@ -58,6 +58,9 @@ struct h2_session {
     request_rec *r;                 /* the request that started this in case
                                      * of 'h2c', NULL otherwise */
     int aborted;                    /* this session is being aborted */
+    int flush;                      /* if != 0, flush output on next occasion */
+    int reprioritize;               /* scheduled streams priority needs to 
+                                     * be re-evaluated */
     apr_size_t frames_received;     /* number of http/2 frames received */
     apr_size_t max_stream_count;    /* max number of open streams */
     apr_size_t max_stream_mem;      /* max buffer memory for a single stream */
@@ -74,6 +77,8 @@ struct h2_session {
     int max_stream_received;        /* highest stream id created */
     int max_stream_handled;         /* highest stream id handled successfully */
     
+    apr_pool_t *spare;              /* spare stream pool */
+    
     struct nghttp2_session *ngh2;   /* the nghttp2 session (internal use) */
     struct h2_workers *workers;     /* for executing stream tasks */
 };
@@ -109,6 +114,13 @@ h2_session *h2_session_rcreate(request_r
 void h2_session_destroy(h2_session *session);
 
 /**
+ * Cleanup the session and all objects it still contains. This will not
+ * destroy h2_task instances that have not finished yet. 
+ * @param session the session to destroy
+ */
+void h2_session_cleanup(h2_session *session);
+
+/**
  * Called once at start of session. 
  * Sets up the session and sends the initial SETTINGS frame.
  * @param session the session to start
@@ -147,8 +159,7 @@ apr_status_t h2_session_read(h2_session
  * a maximum of timeout micro-seconds and return to the caller. If timeout
  * occurred, APR_TIMEUP will be returned.
  */
-apr_status_t h2_session_write(h2_session *session,
-                              apr_interval_time_t timeout);
+apr_status_t h2_session_write(h2_session *session, apr_interval_time_t timeout);
 
 /* Start submitting the response to a stream request. This is possible
  * once we have all the response headers. */
@@ -158,4 +169,12 @@ apr_status_t h2_session_handle_response(
 /* Get the h2_stream for the given stream idenrtifier. */
 struct h2_stream *h2_session_get_stream(h2_session *session, int stream_id);
 
+/**
+ * Destroy the stream and release it everywhere. Reclaim all resources.
+ * @param session the session to which the stream belongs
+ * @param stream the stream to destroy
+ */
+apr_status_t h2_session_stream_destroy(h2_session *session, 
+                                       struct h2_stream *stream);
+
 #endif /* defined(__mod_h2__h2_session__) */

Modified: httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_stream.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_stream.c?rev=1712570&r1=1712569&r2=1712570&view=diff
==============================================================================
--- httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_stream.c (original)
+++ httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_stream.c Wed Nov  4 15:44:24 2015
@@ -16,8 +16,6 @@
 #include <assert.h>
 #include <stddef.h>
 
-#define APR_POOL_DEBUG  7
-
 #include <httpd.h>
 #include <http_core.h>
 #include <http_connection.h>
@@ -30,6 +28,7 @@
 #include "h2_mplx.h"
 #include "h2_request.h"
 #include "h2_response.h"
+#include "h2_session.h"
 #include "h2_stream.h"
 #include "h2_task.h"
 #include "h2_ctx.h"
@@ -46,46 +45,47 @@ static void set_state(h2_stream *stream,
     }
 }
 
-h2_stream *h2_stream_create(int id, apr_pool_t *pool, struct h2_mplx *m)
+h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session)
 {
     h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
     if (stream != NULL) {
         stream->id = id;
         stream->state = H2_STREAM_ST_IDLE;
         stream->pool = pool;
-        stream->m = m;
-        stream->request = h2_request_create(id, pool, m->c->bucket_alloc);
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
-                      "h2_stream(%ld-%d): created", m->id, stream->id);
+        stream->session = session;
+        stream->bbout = apr_brigade_create(stream->pool, 
+                                           stream->session->c->bucket_alloc);
+        stream->request = h2_request_create(id, pool, session->c->bucket_alloc);
+        
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+                      "h2_stream(%ld-%d): created", session->id, stream->id);
     }
     return stream;
 }
 
-static void h2_stream_cleanup(h2_stream *stream)
+apr_status_t h2_stream_destroy(h2_stream *stream)
 {
+    AP_DEBUG_ASSERT(stream);
+    
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c,
+                  "h2_stream(%ld-%d): destroy", stream->session->id, stream->id);
     if (stream->request) {
         h2_request_destroy(stream->request);
         stream->request = NULL;
     }
-}
-
-apr_status_t h2_stream_destroy(h2_stream *stream)
-{
-    AP_DEBUG_ASSERT(stream);
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->m->c,
-                  "h2_stream(%ld-%d): destroy", stream->m->id, stream->id);
-    h2_stream_cleanup(stream);
-    
-    if (stream->task) {
-        h2_task_destroy(stream->task);
-        stream->task = NULL;
-    }
+    
     if (stream->pool) {
         apr_pool_destroy(stream->pool);
     }
     return APR_SUCCESS;
 }
 
+void h2_stream_cleanup(h2_stream *stream)
+{
+    h2_session_stream_destroy(stream->session, stream);
+    /* stream is gone */
+}
+
 apr_pool_t *h2_stream_detach_pool(h2_stream *stream)
 {
     apr_pool_t *pool = stream->pool;
@@ -96,9 +96,9 @@ apr_pool_t *h2_stream_detach_pool(h2_str
 void h2_stream_rst(h2_stream *stream, int error_code)
 {
     stream->rst_error = error_code;
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->m->c,
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c,
                   "h2_stream(%ld-%d): reset, error=%d", 
-                  stream->m->id, stream->id, error_code);
+                  stream->session->id, stream->id, error_code);
 }
 
 apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response,
@@ -108,24 +108,21 @@ apr_status_t h2_stream_set_response(h2_s
     
     stream->response = response;
     if (bb && !APR_BRIGADE_EMPTY(bb)) {
-        if (!stream->bbout) {
-            stream->bbout = apr_brigade_create(stream->pool, 
-                                               stream->m->c->bucket_alloc);
-        }
-        status = h2_util_move(stream->bbout, bb, 16 * 1024, NULL,  
+        int move_all = INT_MAX;
+        /* we can move file handles from h2_mplx into this h2_stream as many
+         * as we want, since the lifetimes are the same and we are not freeing
+         * the ones in h2_mplx->io before this stream is done. */
+        status = h2_util_move(stream->bbout, bb, 16 * 1024, &move_all,  
                               "h2_stream_set_response");
     }
-    if (APLOGctrace1(stream->m->c)) {
+    if (APLOGctrace1(stream->session->c)) {
         apr_size_t len = 0;
         int eos = 0;
-        if (stream->bbout) {
-            h2_util_bb_avail(stream->bbout, &len, &eos);
-        }
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, stream->m->c,
-                      "h2_stream(%ld-%d): set_response(%s), brigade=%s, "
-                      "len=%ld, eos=%d", 
-                      stream->m->id, stream->id, response->status,
-                      (stream->bbout? "yes" : "no"), (long)len, (int)eos);
+        h2_util_bb_avail(stream->bbout, &len, &eos);
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, stream->session->c,
+                      "h2_stream(%ld-%d): set_response(%s), len=%ld, eos=%d", 
+                      stream->session->id, stream->id, response->status,
+                      (long)len, (int)eos);
     }
     return status;
 }
@@ -156,11 +153,12 @@ apr_status_t h2_stream_rwrite(h2_stream
         return APR_ECONNRESET;
     }
     set_state(stream, H2_STREAM_ST_OPEN);
-    status = h2_request_rwrite(stream->request, r, stream->m);
+    status = h2_request_rwrite(stream->request, r, stream->session->mplx);
     return status;
 }
 
-apr_status_t h2_stream_write_eoh(h2_stream *stream, int eos)
+apr_status_t h2_stream_schedule(h2_stream *stream, int eos,
+                                h2_stream_pri_cmp *cmp, void *ctx)
 {
     apr_status_t status;
     AP_DEBUG_ASSERT(stream);
@@ -171,32 +169,26 @@ apr_status_t h2_stream_write_eoh(h2_stre
     /* Seeing the end-of-headers, we have everything we need to 
      * start processing it.
      */
-    status = h2_mplx_create_task(stream->m, stream);
-    if (status == APR_SUCCESS) {
-        status = h2_request_end_headers(stream->request, 
-                                        stream->m, stream->task, eos);
-        if (status == APR_SUCCESS) {
-            status = h2_mplx_do_task(stream->m, stream->task);
-        }
-        if (eos) {
-            status = h2_stream_write_eos(stream);
-        }
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, stream->m->c,
-                      "h2_mplx(%ld-%d): start stream, task %s %s (%s)",
-                      stream->m->id, stream->id,
-                      stream->request->method, stream->request->path,
-                      stream->request->authority);
-        
-    }
+    status = h2_mplx_process(stream->session->mplx, stream->id, 
+                             stream->request, eos, cmp, ctx);
+    if (eos) {
+        set_closed(stream);
+    }
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, stream->session->c,
+                  "h2_mplx(%ld-%d): start stream, task %s %s (%s)",
+                  stream->session->id, stream->id,
+                  stream->request->method, stream->request->path,
+                  stream->request->authority);
+    
     return status;
 }
 
 apr_status_t h2_stream_write_eos(h2_stream *stream)
 {
     AP_DEBUG_ASSERT(stream);
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->m->c,
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c,
                   "h2_stream(%ld-%d): closing input",
-                  stream->m->id, stream->id);
+                  stream->session->id, stream->id);
     if (stream->rst_error) {
         return APR_ECONNRESET;
     }
@@ -224,7 +216,7 @@ apr_status_t h2_stream_write_header(h2_s
             return APR_EINVAL;
     }
     return h2_request_write_header(stream->request, name, nlen,
-                                   value, vlen, stream->m);
+                                   value, vlen, stream->session->mplx);
 }
 
 apr_status_t h2_stream_write_data(h2_stream *stream,
@@ -252,7 +244,8 @@ apr_status_t h2_stream_prep_read(h2_stre
     if (stream->rst_error) {
         return APR_ECONNRESET;
     }
-    if (stream->bbout && !APR_BRIGADE_EMPTY(stream->bbout)) {
+
+    if (!APR_BRIGADE_EMPTY(stream->bbout)) {
         src = "stream";
         status = h2_util_bb_avail(stream->bbout, plen, peos);
         if (status == APR_SUCCESS && !*peos && !*plen) {
@@ -262,16 +255,15 @@ apr_status_t h2_stream_prep_read(h2_stre
     }
     else {
         src = "mplx";
-        status = h2_mplx_out_readx(stream->m, stream->id, 
+        status = h2_mplx_out_readx(stream->session->mplx, stream->id, 
                                    NULL, NULL, plen, peos);
     }
     if (status == APR_SUCCESS && !*peos && !*plen) {
         status = APR_EAGAIN;
     }
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->m->c,
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
                   "h2_stream(%ld-%d): prep_read %s, len=%ld eos=%d",
-                  stream->m->id, stream->id, 
-                  src, (long)*plen, *peos);
+                  stream->session->id, stream->id, src, (long)*plen, *peos);
     return status;
 }
 
@@ -279,16 +271,73 @@ apr_status_t h2_stream_readx(h2_stream *
                              h2_io_data_cb *cb, void *ctx,
                              apr_size_t *plen, int *peos)
 {
+    apr_status_t status = APR_SUCCESS;
+    const char *src;
+    
     if (stream->rst_error) {
         return APR_ECONNRESET;
     }
-    if (stream->bbout && !APR_BRIGADE_EMPTY(stream->bbout)) {
-        return h2_util_bb_readx(stream->bbout, cb, ctx, plen, peos);
+    *peos = 0;
+    if (!APR_BRIGADE_EMPTY(stream->bbout)) {
+        apr_size_t origlen = *plen;
+        
+        src = "stream";
+        status = h2_util_bb_readx(stream->bbout, cb, ctx, plen, peos);
+        if (status == APR_SUCCESS && !*peos && !*plen) {
+            apr_brigade_cleanup(stream->bbout);
+            *plen = origlen;
+            return h2_stream_readx(stream, cb, ctx, plen, peos);
+        }
+    }
+    else {
+        src = "mplx";
+        status = h2_mplx_out_readx(stream->session->mplx, stream->id, 
+                                   cb, ctx, plen, peos);
     }
-    return h2_mplx_out_readx(stream->m, stream->id, 
-                             cb, ctx, plen, peos);
+    
+    if (status == APR_SUCCESS && !*peos && !*plen) {
+        status = APR_EAGAIN;
+    }
+    
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
+                  "h2_stream(%ld-%d): readx %s, len=%ld eos=%d",
+                  stream->session->id, stream->id, src, (long)*plen, *peos);
+    return status;
 }
 
+apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, 
+                               apr_size_t *plen, int *peos)
+{
+    apr_status_t status = APR_SUCCESS;
+
+    if (stream->rst_error) {
+        return APR_ECONNRESET;
+    }
+    
+    if (APR_BRIGADE_EMPTY(stream->bbout)) {
+        apr_size_t tlen = *plen;
+        int eos;
+        status = h2_mplx_out_read_to(stream->session->mplx, stream->id, 
+                                     stream->bbout, &tlen, &eos);
+    }
+    
+    if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(stream->bbout)) {
+        status = h2_transfer_brigade(bb, stream->bbout, stream->pool, 
+                                     plen, peos);
+    }
+    else {
+        *plen = 0;
+        *peos = 0;
+    }
+
+    if (status == APR_SUCCESS && !*peos && !*plen) {
+        status = APR_EAGAIN;
+    }
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
+                  "h2_stream(%ld-%d): read_to, len=%ld eos=%d",
+                  stream->session->id, stream->id, (long)*plen, *peos);
+    return status;
+}
 
 void h2_stream_set_suspended(h2_stream *stream, int suspended)
 {

Modified: httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_stream.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_stream.h?rev=1712570&r1=1712569&r2=1712570&view=diff
==============================================================================
--- httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_stream.h (original)
+++ httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_stream.h Wed Nov  4 15:44:24 2015
@@ -48,6 +48,7 @@ typedef enum {
 struct h2_mplx;
 struct h2_request;
 struct h2_response;
+struct h2_session;
 struct h2_task;
 
 typedef struct h2_stream h2_stream;
@@ -55,27 +56,30 @@ typedef struct h2_stream h2_stream;
 struct h2_stream {
     int id;                     /* http2 stream id */
     h2_stream_state_t state;    /* http/2 state of this stream */
-    struct h2_mplx *m;          /* the multiplexer to work with */
+    struct h2_session *session; /* the session this stream belongs to */
     
     int aborted;                /* was aborted */
     int suspended;              /* DATA sending has been suspended */
+    int rst_error;              /* stream error for RST_STREAM */
     
     apr_pool_t *pool;           /* the memory pool for this stream */
     struct h2_request *request; /* the request made in this stream */
     
-    struct h2_task *task;       /* task created for this stream */
     struct h2_response *response; /* the response, once ready */
+    
     apr_bucket_brigade *bbout;  /* output DATA */
-    int rst_error;              /* stream error for RST_STREAM */
+    apr_size_t data_frames_sent;/* # of DATA frames sent out for this stream */
 };
 
 
 #define H2_STREAM_RST(s, def)    (s->rst_error? s->rst_error : (def))
 
-h2_stream *h2_stream_create(int id, apr_pool_t *pool, struct h2_mplx *m);
+h2_stream *h2_stream_create(int id, apr_pool_t *pool, struct h2_session *session);
 
 apr_status_t h2_stream_destroy(h2_stream *stream);
 
+void h2_stream_cleanup(h2_stream *stream);
+
 void h2_stream_rst(h2_stream *streamm, int error_code);
 
 apr_pool_t *h2_stream_detach_pool(h2_stream *stream);
@@ -88,7 +92,8 @@ apr_status_t h2_stream_write_header(h2_s
                                     const char *name, size_t nlen,
                                     const char *value, size_t vlen);
 
-apr_status_t h2_stream_write_eoh(h2_stream *stream, int eos);
+apr_status_t h2_stream_schedule(h2_stream *stream, int eos,
+                                h2_stream_pri_cmp *cmp, void *ctx);
 
 apr_status_t h2_stream_write_data(h2_stream *stream,
                                   const char *data, size_t len);
@@ -103,6 +108,10 @@ apr_status_t h2_stream_prep_read(h2_stre
 apr_status_t h2_stream_readx(h2_stream *stream, h2_io_data_cb *cb, 
                              void *ctx, apr_size_t *plen, int *peos);
 
+apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, 
+                               apr_size_t *plen, int *peos);
+
+
 void h2_stream_set_suspended(h2_stream *stream, int suspended);
 int h2_stream_is_suspended(h2_stream *stream);
 

Modified: httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_stream_set.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_stream_set.c?rev=1712570&r1=1712569&r2=1712570&view=diff
==============================================================================
--- httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_stream_set.c (original)
+++ httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_stream_set.c Wed Nov  4 15:44:24 2015
@@ -54,9 +54,7 @@ void h2_stream_set_destroy(h2_stream_set
 
 static int h2_stream_id_cmp(const void *s1, const void *s2)
 {
-    h2_stream **pstream1 = (h2_stream **)s1;
-    h2_stream **pstream2 = (h2_stream **)s2;
-    return (*pstream1)->id - (*pstream2)->id;
+    return (*((h2_stream **)s1))->id - (*((h2_stream **)s2))->id;
 }
 
 h2_stream *h2_stream_set_get(h2_stream_set *sp, int stream_id)
@@ -101,12 +99,12 @@ apr_status_t h2_stream_set_add(h2_stream
     return APR_SUCCESS;
 }
 
-h2_stream *h2_stream_set_remove(h2_stream_set *sp, h2_stream *stream)
+h2_stream *h2_stream_set_remove(h2_stream_set *sp, int stream_id)
 {
     int i;
     for (i = 0; i < sp->list->nelts; ++i) {
         h2_stream *s = H2_STREAM_IDX(sp->list, i);
-        if (s == stream) {
+        if (s->id == stream_id) {
             int n;
             --sp->list->nelts;
             n = sp->list->nelts - i;

Modified: httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_stream_set.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_stream_set.h?rev=1712570&r1=1712569&r2=1712570&view=diff
==============================================================================
--- httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_stream_set.h (original)
+++ httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_stream_set.h Wed Nov  4 15:44:24 2015
@@ -35,7 +35,7 @@ apr_status_t h2_stream_set_add(h2_stream
 
 h2_stream *h2_stream_set_get(h2_stream_set *sp, int stream_id);
 
-h2_stream *h2_stream_set_remove(h2_stream_set *sp,h2_stream *stream);
+h2_stream *h2_stream_set_remove(h2_stream_set *sp, int stream_id);
 
 void h2_stream_set_remove_all(h2_stream_set *sp);
 

Modified: httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_switch.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_switch.c?rev=1712570&r1=1712569&r2=1712570&view=diff
==============================================================================
--- httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_switch.c (original)
+++ httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_switch.c Wed Nov  4 15:44:24 2015
@@ -35,24 +35,12 @@
 #include "h2_switch.h"
 
 /*******************************************************************************
- * SSL var lookup
- */
-APR_DECLARE_OPTIONAL_FN(char *, ssl_var_lookup,
-                        (apr_pool_t *, server_rec *,
-                         conn_rec *, request_rec *,
-                         char *));
-static char *(*opt_ssl_var_lookup)(apr_pool_t *, server_rec *,
-                                   conn_rec *, request_rec *,
-                                   char *);
-
-/*******************************************************************************
  * Once per lifetime init, retrieve optional functions
  */
 apr_status_t h2_switch_init(apr_pool_t *pool, server_rec *s)
 {
     (void)pool;
     ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, s, "h2_switch init");
-    opt_ssl_var_lookup = APR_RETRIEVE_OPTIONAL_FN(ssl_var_lookup);
 
     return APR_SUCCESS;
 }
@@ -63,7 +51,8 @@ static int h2_protocol_propose(conn_rec
                                apr_array_header_t *proposals)
 {
     int proposed = 0;
-    const char **protos = h2_h2_is_tls(c)? h2_tls_protos : h2_clear_protos;
+    int is_tls = h2_h2_is_tls(c);
+    const char **protos = is_tls? h2_tls_protos : h2_clear_protos;
     
     (void)s;
     if (strcmp(AP_PROTOCOL_HTTP1, ap_get_protocol(c))) {
@@ -74,12 +63,23 @@ static int h2_protocol_propose(conn_rec
         return DECLINED;
     }
     
+    if (!h2_is_acceptable_connection(c, 0)) {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c,
+                      "protocol propose: connection requirements not met");
+        return DECLINED;
+    }
+    
     if (r) {
-        const char *p;
         /* So far, this indicates an HTTP/1 Upgrade header initiated
          * protocol switch. For that, the HTTP2-Settings header needs
          * to be present and valid for the connection.
          */
+        const char *p;
+        
+        if (!h2_allows_h2_upgrade(c)) {
+            return DECLINED;
+        }
+         
         p = apr_table_get(r->headers_in, "HTTP2-Settings");
         if (!p) {
             ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,

Modified: httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_task.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_task.c?rev=1712570&r1=1712569&r2=1712570&view=diff
==============================================================================
--- httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_task.c (original)
+++ httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_task.c Wed Nov  4 15:44:24 2015
@@ -52,35 +52,37 @@ static apr_status_t h2_filter_stream_inp
                                            ap_input_mode_t mode,
                                            apr_read_type_e block,
                                            apr_off_t readbytes) {
-    h2_task_env *env = filter->ctx;
-    AP_DEBUG_ASSERT(env);
-    if (!env->input) {
+    h2_task *task = filter->ctx;
+    AP_DEBUG_ASSERT(task);
+    if (!task->input) {
         return APR_ECONNABORTED;
     }
-    return h2_task_input_read(env->input, filter, brigade,
+    return h2_task_input_read(task->input, filter, brigade,
                               mode, block, readbytes);
 }
 
 static apr_status_t h2_filter_stream_output(ap_filter_t* filter,
                                             apr_bucket_brigade* brigade) {
-    h2_task_env *env = filter->ctx;
-    AP_DEBUG_ASSERT(env);
-    if (!env->output) {
+    h2_task *task = filter->ctx;
+    AP_DEBUG_ASSERT(task);
+    if (!task->output) {
         return APR_ECONNABORTED;
     }
-    return h2_task_output_write(env->output, filter, brigade);
+    return h2_task_output_write(task->output, filter, brigade);
 }
 
 static apr_status_t h2_filter_read_response(ap_filter_t* f,
                                             apr_bucket_brigade* bb) {
-    h2_task_env *env = f->ctx;
-    AP_DEBUG_ASSERT(env);
-    if (!env->output || !env->output->from_h1) {
+    h2_task *task = f->ctx;
+    AP_DEBUG_ASSERT(task);
+    if (!task->output || !task->output->from_h1) {
         return APR_ECONNABORTED;
     }
-    return h2_from_h1_read_response(env->output->from_h1, f, bb);
+    return h2_from_h1_read_response(task->output->from_h1, f, bb);
 }
 
+static apr_status_t h2_task_process_request(h2_task *task);
+
 /*******************************************************************************
  * Register various hooks
  */
@@ -119,21 +121,15 @@ static int h2_task_pre_conn(conn_rec* c,
     
     (void)arg;
     if (h2_ctx_is_task(ctx)) {
-        h2_task_env *env = h2_ctx_get_task(ctx);
-        
-        /* This connection is a pseudo-connection used for a h2_task.
-         * Since we read/write directly from it ourselves, we need
-         * to disable a possible ssl connection filter.
-         */
-        h2_tls_disable(c);
+        h2_task *task = h2_ctx_get_task(ctx);
         
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c,
                       "h2_h2, pre_connection, found stream task");
         
         /* Add our own, network level in- and output filters.
          */
-        ap_add_input_filter("H2_TO_H1", env, NULL, c);
-        ap_add_output_filter("H1_TO_H2", env, NULL, c);
+        ap_add_input_filter("H2_TO_H1", task, NULL, c);
+        ap_add_output_filter("H1_TO_H2", task, NULL, c);
     }
     return OK;
 }
@@ -143,14 +139,14 @@ static int h2_task_process_conn(conn_rec
     h2_ctx *ctx = h2_ctx_get(c);
     
     if (h2_ctx_is_task(ctx)) {
-        if (!ctx->task_env->serialize_headers) {
+        if (!ctx->task->serialize_headers) {
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c, 
                           "h2_h2, processing request directly");
-            h2_task_process_request(ctx->task_env);
+            h2_task_process_request(ctx->task);
             return DONE;
         }
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c, 
-                      "h2_task(%s), serialized handling", ctx->task_env->id);
+                      "h2_task(%s), serialized handling", ctx->task->id);
     }
     return DECLINED;
 }
@@ -170,8 +166,6 @@ h2_task *h2_task_create(long session_id,
         return NULL;
     }
     
-    APR_RING_ELEM_INIT(task, link);
-
     task->id = apr_psprintf(stream_pool, "%ld-%d", session_id, stream_id);
     task->stream_id = stream_id;
     task->mplx = mplx;
@@ -208,121 +202,73 @@ apr_status_t h2_task_do(h2_task *task, h
 {
     apr_status_t status = APR_SUCCESS;
     h2_config *cfg = h2_config_get(task->mplx->c);
-    h2_task_env env; 
     
     AP_DEBUG_ASSERT(task);
     
-    memset(&env, 0, sizeof(env));
-    
-    env.id = task->id;
-    env.stream_id = task->stream_id;
-    env.mplx = task->mplx;
-    task->mplx = NULL;
-    
-    env.input_eos = task->input_eos;
-    env.serialize_headers = h2_config_geti(cfg, H2_CONF_SER_HEADERS);
+    task->serialize_headers = h2_config_geti(cfg, H2_CONF_SER_HEADERS);
     
     /* Create a subpool from the worker one to be used for all things
-     * with life-time of this task_env execution.
+     * with life-time of this task execution.
      */
-    apr_pool_create(&env.pool, h2_worker_get_pool(worker));
+    apr_pool_create(&task->pool, h2_worker_get_pool(worker));
 
-    /* Link the env to the worker which provides useful things such
+    /* Link the task to the worker which provides useful things such
      * as mutex, a socket etc. */
-    env.io = h2_worker_get_cond(worker);
+    task->io = h2_worker_get_cond(worker);
     
-    /* Clone fields, so that lifetimes become (more) independent. */
-    env.method    = apr_pstrdup(env.pool, task->method);
-    env.scheme    = apr_pstrdup(env.pool, task->scheme);
-    env.authority = apr_pstrdup(env.pool, task->authority);
-    env.path      = apr_pstrdup(env.pool, task->path);
-    env.headers   = apr_table_clone(env.pool, task->headers);
-    
-    /* Setup the pseudo connection to use our own pool and bucket_alloc */
-    env.c = *task->c;
-    task->c = NULL;
-    status = h2_conn_setup(&env, worker);
+    status = h2_conn_setup(task, worker);
     
     /* save in connection that this one is a pseudo connection, prevents
      * other hooks from messing with it. */
-    h2_ctx_create_for(&env.c, &env);
+    h2_ctx_create_for(task->c, task);
 
     if (status == APR_SUCCESS) {
-        env.input = h2_task_input_create(&env, env.pool, 
-                                         env.c.bucket_alloc);
-        env.output = h2_task_output_create(&env, env.pool,
-                                           env.c.bucket_alloc);
-        status = h2_conn_process(&env.c, h2_worker_get_socket(worker));
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, &env.c,
-                      "h2_task(%s): processing done", env.id);
+        task->input = h2_task_input_create(task, task->pool, 
+                                           task->c->bucket_alloc);
+        task->output = h2_task_output_create(task, task->pool,
+                                             task->c->bucket_alloc);
+        status = h2_conn_process(task->c, h2_worker_get_socket(worker));
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, task->c,
+                      "h2_task(%s): processing done", task->id);
     }
     else {
-        ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, &env.c,
-                      APLOGNO(02957) "h2_task(%s): error setting up h2_task_env", 
-                      env.id);
+        ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, task->c,
+                      APLOGNO(02957) "h2_task(%s): error setting up h2_task", 
+                      task->id);
     }
     
-    if (env.input) {
-        h2_task_input_destroy(env.input);
-        env.input = NULL;
+    if (task->input) {
+        h2_task_input_destroy(task->input);
+        task->input = NULL;
     }
     
-    if (env.output) {
-        h2_task_output_close(env.output);
-        h2_task_output_destroy(env.output);
-        env.output = NULL;
+    if (task->output) {
+        h2_task_output_close(task->output);
+        h2_task_output_destroy(task->output);
+        task->output = NULL;
     }
 
-    h2_task_set_finished(task);
-    if (env.io) {
-        apr_thread_cond_signal(env.io);
+    if (task->io) {
+        apr_thread_cond_signal(task->io);
     }
     
-    if (env.pool) {
-        apr_pool_destroy(env.pool);
-        env.pool = NULL;
+    if (task->pool) {
+        apr_pool_destroy(task->pool);
+        task->pool = NULL;
     }
     
-    if (env.c.id) {
-        h2_conn_post(&env.c, worker);
+    if (task->c->id) {
+        h2_conn_post(task->c, worker);
     }
     
-    h2_mplx_task_done(env.mplx, env.stream_id);
+    h2_mplx_task_done(task->mplx, task->stream_id);
     
     return status;
 }
 
-int h2_task_has_started(h2_task *task)
-{
-    AP_DEBUG_ASSERT(task);
-    return apr_atomic_read32(&task->has_started);
-}
-
-void h2_task_set_started(h2_task *task)
-{
-    AP_DEBUG_ASSERT(task);
-    apr_atomic_set32(&task->has_started, 1);
-}
-
-int h2_task_has_finished(h2_task *task)
-{
-    return apr_atomic_read32(&task->has_finished);
-}
-
-void h2_task_set_finished(h2_task *task)
-{
-    apr_atomic_set32(&task->has_finished, 1);
-}
-
-void h2_task_die(h2_task_env *env, int status, request_rec *r)
-{
-    (void)env;
-    ap_die(status, r);
-}
-
-static request_rec *h2_task_create_request(h2_task_env *env)
+static request_rec *h2_task_create_request(h2_task *task)
 {
-    conn_rec *conn = &env->c;
+    conn_rec *conn = task->c;
     request_rec *r;
     apr_pool_t *p;
     int access_status = HTTP_OK;    
@@ -340,7 +286,7 @@ static request_rec *h2_task_create_reque
     
     r->allowed_methods = ap_make_method_list(p, 2);
     
-    r->headers_in = apr_table_copy(r->pool, env->headers);
+    r->headers_in = apr_table_copy(r->pool, task->headers);
     r->trailers_in     = apr_table_make(r->pool, 5);
     r->subprocess_env  = apr_table_make(r->pool, 25);
     r->headers_out     = apr_table_make(r->pool, 12);
@@ -379,19 +325,19 @@ static request_rec *h2_task_create_reque
     
     /* Time to populate r with the data we have. */
     r->request_time = apr_time_now();
-    r->method = env->method;
+    r->method = task->method;
     /* Provide quick information about the request method as soon as known */
     r->method_number = ap_method_number_of(r->method);
     if (r->method_number == M_GET && r->method[0] == 'H') {
         r->header_only = 1;
     }
 
-    ap_parse_uri(r, env->path);
+    ap_parse_uri(r, task->path);
     r->protocol = (char*)"HTTP/2";
     r->proto_num = HTTP_VERSION(2, 0);
 
     r->the_request = apr_psprintf(r->pool, "%s %s %s", 
-                                  r->method, env->path, r->protocol);
+                                  r->method, task->path, r->protocol);
     
     /* update what we think the virtual host is based on the headers we've
      * now read. may update status.
@@ -418,7 +364,7 @@ static request_rec *h2_task_create_reque
         /* Request check post hooks failed. An example of this would be a
          * request for a vhost where h2 is disabled --> 421.
          */
-        h2_task_die(env, access_status, r);
+        ap_die(access_status, r);
         ap_update_child_status(conn->sbh, SERVER_BUSY_LOG, r);
         ap_run_log_transaction(r);
         r = NULL;
@@ -435,13 +381,13 @@ traceout:
 }
 
 
-apr_status_t h2_task_process_request(h2_task_env *env)
+static apr_status_t h2_task_process_request(h2_task *task)
 {
-    conn_rec *c = &env->c;
+    conn_rec *c = task->c;
     request_rec *r;
     conn_state_t *cs = c->cs;
 
-    r = h2_task_create_request(env);
+    r = h2_task_create_request(task);
     if (r && (r->status == HTTP_OK)) {
         ap_update_child_status(c->sbh, SERVER_BUSY_READ, r);
         

Modified: httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_task.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_task.h?rev=1712570&r1=1712569&r2=1712570&view=diff
==============================================================================
--- httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_task.h (original)
+++ httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_task.h Wed Nov  4 15:44:24 2015
@@ -45,16 +45,10 @@ struct h2_worker;
 typedef struct h2_task h2_task;
 
 struct h2_task {
-    /** Links to the rest of the tasks */
-    APR_RING_ENTRY(h2_task) link;
-
     const char *id;
     int stream_id;
     struct h2_mplx *mplx;
     
-    volatile apr_uint32_t has_started;
-    volatile apr_uint32_t has_finished;
-    
     const char *method;
     const char *scheme;
     const char *authority;
@@ -62,103 +56,18 @@ struct h2_task {
     apr_table_t *headers;
     int input_eos;
 
-    struct conn_rec *c;
-};
+    int serialize_headers;
 
-typedef struct h2_task_env h2_task_env;
+    struct conn_rec *c;
 
-struct h2_task_env {
-    const char *id;
-    int stream_id;
-    struct h2_mplx *mplx;
-    
     apr_pool_t *pool;              /* pool for task lifetime things */
     apr_bucket_alloc_t *bucket_alloc;
-    
-    const char *method;
-    const char *scheme;
-    const char *authority;
-    const char *path;
-    apr_table_t *headers;
-    int input_eos;
-    
-    int serialize_headers;
-
-    struct conn_rec c;
     struct h2_task_input *input;
     struct h2_task_output *output;
     
     struct apr_thread_cond_t *io;   /* used to wait for events on */
 };
 
-/**
- * The magic pointer value that indicates the head of a h2_task list
- * @param  b The task list
- * @return The magic pointer value
- */
-#define H2_TASK_LIST_SENTINEL(b)	APR_RING_SENTINEL((b), h2_task, link)
-
-/**
- * Determine if the task list is empty
- * @param b The list to check
- * @return true or false
- */
-#define H2_TASK_LIST_EMPTY(b)	APR_RING_EMPTY((b), h2_task, link)
-
-/**
- * Return the first task in a list
- * @param b The list to query
- * @return The first task in the list
- */
-#define H2_TASK_LIST_FIRST(b)	APR_RING_FIRST(b)
-
-/**
- * Return the last task in a list
- * @param b The list to query
- * @return The last task int he list
- */
-#define H2_TASK_LIST_LAST(b)	APR_RING_LAST(b)
-
-/**
- * Insert a single task at the front of a list
- * @param b The list to add to
- * @param e The task to insert
- */
-#define H2_TASK_LIST_INSERT_HEAD(b, e) do {				\
-    h2_task *ap__b = (e);                                        \
-    APR_RING_INSERT_HEAD((b), ap__b, h2_task, link);	\
-} while (0)
-
-/**
- * Insert a single task at the end of a list
- * @param b The list to add to
- * @param e The task to insert
- */
-#define H2_TASK_LIST_INSERT_TAIL(b, e) do {				\
-    h2_task *ap__b = (e);					\
-    APR_RING_INSERT_TAIL((b), ap__b, h2_task, link);	\
-} while (0)
-
-/**
- * Get the next task in the list
- * @param e The current task
- * @return The next task
- */
-#define H2_TASK_NEXT(e)	APR_RING_NEXT((e), link)
-/**
- * Get the previous task in the list
- * @param e The current task
- * @return The previous task
- */
-#define H2_TASK_PREV(e)	APR_RING_PREV((e), link)
-
-/**
- * Remove a task from its list
- * @param e The task to remove
- */
-#define H2_TASK_REMOVE(e)	APR_RING_REMOVE((e), link)
-
-
 h2_task *h2_task_create(long session_id, int stream_id, 
                         apr_pool_t *pool, struct h2_mplx *mplx,
                         conn_rec *c);
@@ -174,14 +83,7 @@ void h2_task_set_request(h2_task *task,
 
 
 apr_status_t h2_task_do(h2_task *task, struct h2_worker *worker);
-apr_status_t h2_task_process_request(h2_task_env *env);
-
-int h2_task_has_started(h2_task *task);
-void h2_task_set_started(h2_task *task);
-int h2_task_has_finished(h2_task *task);
-void h2_task_set_finished(h2_task *task);
 
 void h2_task_register_hooks(void);
-void h2_task_die(h2_task_env *env, int status, request_rec *r);
 
 #endif /* defined(__mod_h2__h2_task__) */



Mime
View raw message