httpd-cvs mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ic...@apache.org
Subject svn commit: r1782875 [2/3] - in /httpd/httpd/trunk: ./ modules/http2/
Date Mon, 13 Feb 2017 21:00:31 GMT
Modified: httpd/httpd/trunk/modules/http2/h2_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_session.c?rev=1782875&r1=1782874&r2=1782875&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_session.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_session.c Mon Feb 13 21:00:30 2017
@@ -48,6 +48,11 @@
 
 static apr_status_t dispatch_master(h2_session *session);
 static apr_status_t h2_session_read(h2_session *session, int block);
+static void transit(h2_session *session, const char *action, 
+                    h2_session_state nstate);
+
+static void on_stream_state_enter(void *ctx, h2_stream *stream);
+static void on_stream_state_event(void *ctx, h2_stream *stream, h2_stream_event_t ev);
 
 static int h2_session_status_from_apr_status(apr_status_t rv)
 {
@@ -79,58 +84,30 @@ static apr_status_t h2_session_receive(v
 static void dispatch_event(h2_session *session, h2_session_event_t ev, 
                              int err, const char *msg);
 
-apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
+static int rst_unprocessed_stream(h2_stream *stream, void *ctx)
 {
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
-                  "h2_stream(%ld-%d): EOS bucket cleanup -> done", 
-                  session->id, stream->id);
-    h2_mplx_stream_done(session->mplx, stream);
-    
-    dispatch_event(session, H2_SESSION_EV_STREAM_DONE, 0, NULL);
-    return APR_SUCCESS;
-}
-
-typedef struct stream_sel_ctx {
-    h2_session *session;
-    h2_stream *candidate;
-} stream_sel_ctx;
-
-static int find_unprocessed_stream(h2_stream *stream, void *ictx)
-{
-    stream_sel_ctx *ctx = ictx;
-    if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
-        if (!ctx->session->local.accepting
-            && stream->id > ctx->session->local.accepted_max) {
-            ctx->candidate = stream;
-            return 0;
-        }
-    }
-    else {
-        if (!ctx->session->remote.accepting
-            && stream->id > ctx->session->remote.accepted_max) {
-            ctx->candidate = stream;
-            return 0;
-        }
+    int unprocessed = (!h2_stream_was_closed(stream)
+                       && (H2_STREAM_CLIENT_INITIATED(stream->id)? 
+                           (!stream->session->local.accepting
+                            && stream->id > stream->session->local.accepted_max)
+                            : 
+                           (!stream->session->remote.accepting
+                            && stream->id > stream->session->remote.accepted_max))
+                       ); 
+    if (unprocessed) {
+        h2_stream_rst(stream, H2_ERR_NO_ERROR);
+        return 0;
     }
     return 1;
 }
 
 static void cleanup_unprocessed_streams(h2_session *session)
 {
-    stream_sel_ctx ctx;
-    ctx.session = session;
-    while (1) {
-        ctx.candidate = NULL;
-        h2_mplx_stream_do(session->mplx, find_unprocessed_stream, &ctx);
-        if (!ctx.candidate) {
-            break;
-        }
-        h2_session_stream_done(session, ctx.candidate);
-    }
+    h2_mplx_stream_do(session->mplx, rst_unprocessed_stream, session);
 }
 
-h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
-                                  int initiated_on, const h2_request *req)
+static h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
+                                         int initiated_on)
 {
     h2_stream * stream;
     apr_pool_t *stream_pool;
@@ -138,29 +115,11 @@ h2_stream *h2_session_open_stream(h2_ses
     apr_pool_create(&stream_pool, session->pool);
     apr_pool_tag(stream_pool, "h2_stream");
     
-    stream = h2_stream_open(stream_id, stream_pool, session, 
-                            initiated_on);
-    nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
-    
-    if (req) {
-        h2_stream_set_request(stream, req);
-    }
-    
-    if (H2_STREAM_CLIENT_INITIATED(stream_id)) {
-        if (stream_id > session->remote.emitted_max) {
-            ++session->remote.emitted_count;
-            session->remote.emitted_max = stream->id;
-            session->local.accepted_max = stream->id;
-        }
-    }
-    else {
-        if (stream_id > session->local.emitted_max) {
-            ++session->local.emitted_count;
-            session->remote.emitted_max = stream->id;
-        }
+    stream = h2_stream_create(stream_id, stream_pool, session, 
+                              session->monitor, initiated_on);
+    if (stream) {
+        nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
     }
-    dispatch_event(session, H2_SESSION_EV_STREAM_OPEN, 0, NULL);
-    
     return stream;
 }
 
@@ -217,14 +176,6 @@ static int stream_pri_cmp(int sid1, int
     return spri_cmp(sid1, s1, sid2, s2, session);
 }
 
-static apr_status_t stream_schedule(h2_session *session,
-                                    h2_stream *stream, int eos)
-{
-    (void)session;
-    return h2_stream_schedule(stream, eos, h2_session_push_enabled(session), 
-                              stream_pri_cmp, session);
-}
-
 /*
  * Callback when nghttp2 wants to send bytes back to the client.
  */
@@ -234,9 +185,9 @@ static ssize_t send_cb(nghttp2_session *
 {
     h2_session *session = (h2_session *)userp;
     apr_status_t status;
-    
     (void)ngh2;
     (void)flags;
+    
     status = h2_conn_io_write(&session->io, (const char *)data, length);
     if (status == APR_SUCCESS) {
         return length;
@@ -278,76 +229,26 @@ static int on_data_chunk_recv_cb(nghttp2
                                  const uint8_t *data, size_t len, void *userp)
 {
     h2_session *session = (h2_session *)userp;
-    apr_status_t status = APR_SUCCESS;
+    apr_status_t status = APR_EINVAL;
     h2_stream * stream;
-    int rv;
+    int rv = 0;
     
-    (void)flags;
     stream = get_stream(session, stream_id);
-    if (!stream) {
+    if (stream) {
+        status = h2_stream_recv_DATA(stream, flags, data, len);
+    }
+    else {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03064)
                       "h2_stream(%ld-%d): on_data_chunk for unknown stream",
                       session->id, (int)stream_id);
-        rv = nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream_id,
-                                       NGHTTP2_INTERNAL_ERROR);
-        if (nghttp2_is_fatal(rv)) {
-            return NGHTTP2_ERR_CALLBACK_FAILURE;
-        }
-        return 0;
-    }
-
-    /* FIXME: enabling setting EOS this way seems to break input handling
-     * in mod_proxy_http2. why? */
-    status = h2_stream_write_data(stream, (const char *)data, len,
-                                  0 /*flags & NGHTTP2_FLAG_END_STREAM*/);
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
-                  "h2_stream(%ld-%d): data_chunk_recv, written %ld bytes",
-                  session->id, stream_id, (long)len);
-    if (status != APR_SUCCESS) {
-        update_window(session, stream_id, len);
-        rv = nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream_id,
-                                       H2_STREAM_RST(stream, H2_ERR_INTERNAL_ERROR));
-        if (nghttp2_is_fatal(rv)) {
-            return NGHTTP2_ERR_CALLBACK_FAILURE;
-        }
+        rv = NGHTTP2_ERR_CALLBACK_FAILURE;
     }
-    return 0;
-}
-
-static apr_status_t stream_closed(h2_session *session, 
-                                  h2_stream *stream,
-                                  uint32_t error_code) 
-{
-    conn_rec *c = session->c;
-    apr_bucket *b;
-    apr_status_t status;
     
-    if (!error_code) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
-                      "h2_stream(%ld-%d): handled, closing", 
-                      session->id, (int)stream->id);
-        if (H2_STREAM_CLIENT_INITIATED(stream->id)
-            && stream->id > session->local.completed_max) {
-            session->local.completed_max = stream->id;
-        }
-    }
-    else {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03065)
-                      "h2_stream(%ld-%d): closing with err=%d %s", 
-                      session->id, (int)stream->id, (int)error_code,
-                      h2_h2_err_description(error_code));
-        h2_stream_rst(stream, error_code);
+    if (status != APR_SUCCESS) {
+        /* count this as consumed explicitly as no one will read it */
+        nghttp2_session_consume(session->ngh2, stream_id, len);
     }
-    /* The stream might have data in the buffers of the main connection.
-     * We can only free the allocated resources once all had been written.
-     * Send a special buckets on the connection that gets destroyed when
-     * all preceding data has been handled. On its destruction, it is safe
-     * to purge all resources of the stream. */
-    b = h2_bucket_eos_create(c->bucket_alloc, stream);
-    APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
-    status = h2_conn_io_pass(&session->io, session->bbtmp);
-    apr_brigade_cleanup(session->bbtmp);
-    return status;
+    return rv;
 }
 
 static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
@@ -359,7 +260,12 @@ static int on_stream_close_cb(nghttp2_se
     (void)ngh2;
     stream = get_stream(session, stream_id);
     if (stream) {
-        stream_closed(session, stream, error_code);
+        if (error_code) {
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03065)
+                          H2_STREAM_MSG(stream, "closing with err=%d %s"), 
+                          (int)error_code, h2_h2_err_description(error_code));
+            h2_stream_rst(stream, error_code);
+        }
     }
     return 0;
 }
@@ -378,7 +284,7 @@ static int on_begin_headers_cb(nghttp2_s
         /* nop */
     }
     else {
-        s = h2_session_open_stream(userp, frame->hd.stream_id, 0, NULL);
+        s = h2_session_open_stream(userp, frame->hd.stream_id, 0);
     }
     return s? 0 : NGHTTP2_ERR_START_STREAM_NOT_ALLOWED;
 }
@@ -405,14 +311,7 @@ static int on_header_cb(nghttp2_session
     
     status = h2_stream_add_header(stream, (const char *)name, namelen,
                                   (const char *)value, valuelen);
-    if (status == APR_ECONNRESET) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
-                      "h2-stream(%ld-%d): on_header, reset stream",
-                      session->id, stream->id);
-        nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream->id,
-                                  NGHTTP2_INTERNAL_ERROR);
-    }
-    else if (status != APR_SUCCESS && !h2_stream_is_ready(stream)) {
+    if (status != APR_SUCCESS && !h2_stream_is_ready(stream)) {
         return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
     }
     return 0;
@@ -428,7 +327,6 @@ static int on_frame_recv_cb(nghttp2_sess
                             void *userp)
 {
     h2_session *session = (h2_session *)userp;
-    apr_status_t status = APR_SUCCESS;
     h2_stream *stream;
     
     if (APLOGcdebug(session->c)) {
@@ -449,41 +347,16 @@ static int on_frame_recv_cb(nghttp2_sess
              * trailers */
             stream = get_stream(session, frame->hd.stream_id);
             if (stream) {
-                int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
-                
-                if (h2_stream_is_scheduled(stream)) {
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
-                                  "h2_stream(%ld-%d): TRAILER, eos=%d", 
-                                  session->id, frame->hd.stream_id, eos);
-                    if (eos) {
-                        status = h2_stream_close_input(stream);
-                    }
-                }
-                else {
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
-                                  "h2_stream(%ld-%d): HEADER, eos=%d", 
-                                  session->id, frame->hd.stream_id, eos);
-                    status = stream_schedule(session, stream, eos);
-                }
-            }
-            else {
-                status = APR_EINVAL;
+                h2_stream_recv_frame(stream, NGHTTP2_HEADERS, frame->hd.flags);
             }
             break;
         case NGHTTP2_DATA:
             stream = get_stream(session, frame->hd.stream_id);
             if (stream) {
-                int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
-                              "h2_stream(%ld-%d): DATA, len=%ld, eos=%d", 
-                              session->id, frame->hd.stream_id, 
-                              (long)frame->hd.length, eos);
-                if (eos) {
-                    status = h2_stream_close_input(stream);
-                }
-            }
-            else {
-                status = APR_EINVAL;
+                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(02923) 
+                              H2_STREAM_MSG(stream, "DATA, len=%ld, flags=%d"), 
+                              (long)frame->hd.length, frame->hd.flags);
+                h2_stream_recv_frame(stream, NGHTTP2_DATA, frame->hd.flags);
             }
             break;
         case NGHTTP2_PRIORITY:
@@ -539,22 +412,6 @@ static int on_frame_recv_cb(nghttp2_sess
             }
             break;
     }
-
-    if (status != APR_SUCCESS) {
-        int rv;
-        
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
-                      APLOGNO(02923) 
-                      "h2_session: stream(%ld-%d): error handling frame",
-                      session->id, (int)frame->hd.stream_id);
-        rv = nghttp2_submit_rst_stream(ng2s, NGHTTP2_FLAG_NONE,
-                                       frame->hd.stream_id,
-                                       NGHTTP2_INTERNAL_ERROR);
-        if (nghttp2_is_fatal(rv)) {
-            return NGHTTP2_ERR_CALLBACK_FAILURE;
-        }
-    }
-    
     return 0;
 }
 
@@ -617,24 +474,21 @@ static int on_send_data_cb(nghttp2_sessi
     
     if (status != APR_SUCCESS) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
-                      "h2_stream(%ld-%d): writing frame header",
-                      session->id, (int)stream_id);
+                      H2_STREAM_MSG(stream, "writing frame header"));
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
     
     status = h2_stream_read_to(stream, session->bbtmp, &len, &eos);
     if (status != APR_SUCCESS) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
-                      "h2_stream(%ld-%d): send_data_cb, reading stream",
-                      session->id, (int)stream_id);
+                      H2_STREAM_MSG(stream, "send_data_cb, reading stream"));
         apr_brigade_cleanup(session->bbtmp);
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
     else if (len != length) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
-                      "h2_stream(%ld-%d): send_data_cb, wanted %ld bytes, "
-                      "got %ld from stream",
-                      session->id, (int)stream_id, (long)length, (long)len);
+                      H2_STREAM_MSG(stream, "send_data_cb, wanted %ld bytes, "
+                      "got %ld from stream"), (long)length, (long)len);
         apr_brigade_cleanup(session->bbtmp);
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
@@ -654,10 +508,8 @@ static int on_send_data_cb(nghttp2_sessi
         return 0;
     }
     else {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
-                      APLOGNO(02925) 
-                      "h2_stream(%ld-%d): failed send_data_cb",
-                      session->id, (int)stream_id);
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(02925) 
+                      H2_STREAM_MSG(stream, "failed send_data_cb"));
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
 }
@@ -667,6 +519,19 @@ static int on_frame_send_cb(nghttp2_sess
                             void *user_data)
 {
     h2_session *session = user_data;
+    h2_stream *stream;
+    int stream_id = frame->hd.stream_id;
+    
+    ++session->frames_sent;
+    switch (frame->hd.type) {
+        case NGHTTP2_PUSH_PROMISE:
+            /* PUSH_PROMISE we report on the promised stream */
+            stream_id = frame->push_promise.promised_stream_id;
+            break;
+        default:    
+            break;
+    }
+    
     if (APLOGcdebug(session->c)) {
         char buffer[256];
         
@@ -676,7 +541,11 @@ static int on_frame_send_cb(nghttp2_sess
                       session->id, buffer, (long)session->frames_received,
                      (long)session->frames_sent);
     }
-    ++session->frames_sent;
+    
+    stream = get_stream(session, stream_id);
+    if (stream) {
+        h2_stream_send_frame(stream, frame->hd.type, frame->hd.flags);
+    }
     return 0;
 }
 
@@ -688,6 +557,8 @@ static int on_invalid_header_cb(nghttp2_
                                 uint8_t flags, void *user_data)
 {
     h2_session *session = user_data;
+    h2_stream *stream;
+    
     if (APLOGcdebug(session->c)) {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03456)
                       "h2_session(%ld-%d): denying stream with invalid header "
@@ -695,9 +566,11 @@ static int on_invalid_header_cb(nghttp2_
                       apr_pstrndup(session->pool, (const char *)name, namelen),
                       apr_pstrndup(session->pool, (const char *)value, valuelen));
     }
-    return nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
-                                     frame->hd.stream_id, 
-                                     NGHTTP2_PROTOCOL_ERROR);
+    stream = get_stream(session, frame->hd.stream_id);
+    if (stream) {
+        h2_stream_rst(stream, NGHTTP2_PROTOCOL_ERROR);
+    }
+    return 0;
 }
 #endif
 
@@ -791,12 +664,6 @@ static apr_status_t h2_session_shutdown(
                   "session(%ld): sent GOAWAY, err=%d, msg=%s", 
                   session->id, error, msg? msg : "");
     dispatch_event(session, H2_SESSION_EV_LOCAL_GOAWAY, error, msg);
-    
-    if (force_close) {
-        apr_brigade_cleanup(session->bbtmp);
-        h2_mplx_abort(session->mplx);
-    }
-    
     return status;
 }
 
@@ -822,15 +689,15 @@ static apr_status_t session_pool_cleanup
                       session->id);
     }
 
-    if (session->mplx) {
-        h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
-        h2_mplx_release_and_join(session->mplx, session->iowait);
-        session->mplx = NULL;
-    }
-    if (session->ngh2) {
-        nghttp2_session_del(session->ngh2);
-        session->ngh2 = NULL;
-    }
+    transit(session, "pool cleanup", H2_SESSION_ST_CLEANUP);
+    h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
+    h2_mplx_release_and_join(session->mplx, session->iowait);
+    session->mplx = NULL;
+
+    ap_assert(session->ngh2);
+    nghttp2_session_del(session->ngh2);
+    session->ngh2 = NULL;
+
     return APR_SUCCESS;
 }
 
@@ -891,7 +758,7 @@ static h2_session *h2_session_create_int
 
     /* get h2_session a lifetime beyond its pool and everything
      * connected to it. */
-    session = apr_pcalloc(c->pool, sizeof(h2_session));
+    session = apr_pcalloc(pool, sizeof(h2_session));
     if (session) {
         int rv;
         nghttp2_mem *mem;
@@ -920,6 +787,14 @@ static h2_session *h2_session_create_int
             return NULL;
         }
         
+        session->monitor = apr_pcalloc(pool, sizeof(h2_stream_monitor));
+        if (session->monitor == NULL) {
+            return NULL;
+        }
+        session->monitor->ctx = session;
+        session->monitor->on_state_enter = on_stream_state_enter;
+        session->monitor->on_state_event = on_stream_state_event;
+         
         session->mplx = h2_mplx_create(c, session->pool, session->config, 
                                        session->s->timeout, workers);
         
@@ -1052,7 +927,7 @@ static apr_status_t h2_session_start(h2_
         }
         
         /* Now we need to auto-open stream 1 for the request we got. */
-        stream = h2_session_open_stream(session, 1, 0, NULL);
+        stream = h2_session_open_stream(session, 1, 0);
         if (!stream) {
             status = APR_EGENERAL;
             ap_log_rerror(APLOG_MARK, APLOG_ERR, status, session->r,
@@ -1061,11 +936,7 @@ static apr_status_t h2_session_start(h2_
             return status;
         }
         
-        status = h2_stream_set_request_rec(stream, session->r);
-        if (status != APR_SUCCESS) {
-            return status;
-        }
-        status = stream_schedule(session, stream, 1);
+        status = h2_stream_set_request_rec(stream, session->r, 1);
         if (status != APR_SUCCESS) {
             return status;
         }
@@ -1157,6 +1028,9 @@ static ssize_t stream_data_cb(nghttp2_se
 
     status = h2_stream_out_prepare(stream, &nread, &eos, NULL);
     if (nread) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
+                      H2_STREAM_MSG(stream, "prepared no_copy, len=%ld, eos=%d"),
+                      (long)nread, eos);
         *data_flags |=  NGHTTP2_DATA_FLAG_NO_COPY;
     }
     
@@ -1165,8 +1039,7 @@ static ssize_t stream_data_cb(nghttp2_se
             break;
             
         case APR_ECONNRESET:
-            return nghttp2_submit_rst_stream(ng2s, NGHTTP2_FLAG_NONE,
-                stream->id, stream->rst_error);
+            return 0;
             
         case APR_EAGAIN:
             /* If there is no data available, our session will automatically
@@ -1175,15 +1048,13 @@ static ssize_t stream_data_cb(nghttp2_se
              */
             nread = 0;
             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03071)
-                          "h2_stream(%ld-%d): suspending",
-                          session->id, (int)stream_id);
+                          H2_STREAM_MSG(stream, "suspending"));
             return NGHTTP2_ERR_DEFERRED;
             
         default:
             nread = 0;
-            ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
-                          APLOGNO(02938) "h2_stream(%ld-%d): reading data",
-                          session->id, (int)stream_id);
+            ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c, APLOGNO(02938)
+                          H2_STREAM_MSG(stream, "reading data"));
             return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
     
@@ -1196,7 +1067,6 @@ static ssize_t stream_data_cb(nghttp2_se
 struct h2_stream *h2_session_push(h2_session *session, h2_stream *is,
                                   h2_push *push)
 {
-    apr_status_t status;
     h2_stream *stream;
     h2_ngheader *ngh;
     int nid;
@@ -1206,41 +1076,30 @@ struct h2_stream *h2_session_push(h2_ses
                                       ngh->nv, ngh->nvlen, NULL);
     if (nid <= 0) {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03075)
-                      "h2_stream(%ld-%d): submitting push promise fail: %s",
-                      session->id, is->id, nghttp2_strerror(nid));
+                      H2_STREAM_MSG(is, "submitting push promise fail: %s"),
+                      nghttp2_strerror(nid));
         return NULL;
     }
     ++session->pushes_promised;
     
     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03076)
-                  "h2_stream(%ld-%d): SERVER_PUSH %d for %s %s on %d",
-                  session->id, is->id, nid,
-                  push->req->method, push->req->path, is->id);
+                  H2_STREAM_MSG(is, "SERVER_PUSH %d for %s %s on %d"),
+                  nid, push->req->method, push->req->path, is->id);
                   
-    stream = h2_session_open_stream(session, nid, is->id, push->req);
-    if (stream) {
-        h2_session_set_prio(session, stream, push->priority);
-        status = stream_schedule(session, stream, 1);
-        if (status != APR_SUCCESS) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
-                          "h2_stream(%ld-%d): scheduling push stream",
-                          session->id, stream->id);
-            stream = NULL;
-        }
-        ++session->unsent_promises;
-    }
-    else {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03077)
-                      "h2_stream(%ld-%d): failed to create stream obj %d",
-                      session->id, is->id, nid);
-    }
-
+    stream = h2_session_open_stream(session, nid, is->id);
     if (!stream) {
-        /* try to tell the client that it should not wait. */
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03077)
+                      H2_STREAM_MSG(stream, "failed to create stream obj %d"),
+                      nid);
+        /* kill the push_promise */
         nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, nid,
                                   NGHTTP2_INTERNAL_ERROR);
+        return NULL;
     }
     
+    h2_session_set_prio(session, stream, push->priority);
+    h2_stream_set_request(stream, push->req);
+    ++session->unsent_promises;
     return stream;
 }
 
@@ -1265,8 +1124,7 @@ apr_status_t h2_session_set_prio(h2_sess
     s = nghttp2_session_find_stream(session->ngh2, stream->id);
     if (!s) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
-                      "h2_stream(%ld-%d): lookup of nghttp2_stream failed",
-                      session->id, stream->id);
+                      H2_STREAM_MSG(stream, "lookup of nghttp2_stream failed"));
         return APR_EINVAL;
     }
     
@@ -1337,10 +1195,8 @@ apr_status_t h2_session_set_prio(h2_sess
 
         rv = nghttp2_session_change_stream_priority(session->ngh2, stream->id, &ps);
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03203)
-                      "h2_stream(%ld-%d): PUSH %s, weight=%d, "
-                      "depends=%d, returned=%d",
-                      session->id, stream->id, ptype, 
-                      ps.weight, ps.stream_id, rv);
+                      H2_STREAM_MSG(stream, "PUSH %s, weight=%d, depends=%d, returned=%d"),
+                      ptype, ps.weight, ps.stream_id, rv);
         status = (rv < 0)? APR_EGENERAL : APR_SUCCESS;
     }
 #else
@@ -1404,14 +1260,9 @@ static apr_status_t on_stream_headers(h2
 
     ap_assert(session);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
-                  "h2_stream(%ld-%d): on_headers", session->id, stream->id);
+                  H2_STREAM_MSG(stream, "on_headers"));
     if (headers->status < 100) {
-        int err = H2_STREAM_RST(stream, headers->status);
-        rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
-                                       stream->id, err);
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
-                  "h2_stream(%ld-%d): unpexected header status %d, stream rst", 
-                  session->id, stream->id, headers->status);
+        h2_stream_rst(stream, headers->status);
         goto leave;
     }
     else if (stream->has_response) {
@@ -1419,8 +1270,7 @@ static apr_status_t on_stream_headers(h2
         
         nh = h2_util_ngheader_make(stream->pool, headers->headers);
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03072)
-                      "h2_stream(%ld-%d): submit %d trailers",
-                      session->id, (int)stream->id,(int) nh->nvlen);
+                      H2_STREAM_MSG(stream, "submit %d trailers"), (int)nh->nvlen);
         rv = nghttp2_submit_trailer(session->ngh2, stream->id, nh->nv, nh->nvlen);
         goto leave;
     }
@@ -1431,8 +1281,8 @@ static apr_status_t on_stream_headers(h2
         const char *note;
         
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073)
-                      "h2_stream(%ld-%d): submit response %d, REMOTE_WINDOW_SIZE=%u",
-                      session->id, stream->id, headers->status,
+                      H2_STREAM_MSG(stream, "submit response %d, REMOTE_WINDOW_SIZE=%u"),
+                      headers->status,
                       (unsigned int)nghttp2_session_get_stream_remote_window_size(session->ngh2, stream->id));
         
         if (!eos || len > 0) {
@@ -1548,14 +1398,14 @@ static apr_status_t on_stream_resume(voi
     
     ap_assert(stream);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
-                  "h2_stream(%ld-%d): on_resume", session->id, stream->id);
+                  H2_STREAM_MSG(stream, "on_resume"));
         
 send_headers:
     headers = NULL;
     status = h2_stream_out_prepare(stream, &len, &eos, &headers);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, 
-                  "h2_stream(%ld-%d): prepared len=%ld, eos=%d", 
-                  session->id, stream->id, (long)len, eos);
+                  H2_STREAM_MSG(stream, "prepared len=%ld, eos=%d"), 
+                  (long)len, eos);
     if (headers) {
         status = on_stream_headers(session, stream, headers, len, eos);
         if (status != APR_SUCCESS || stream->rst_error) {
@@ -1564,22 +1414,19 @@ send_headers:
         goto send_headers;
     }
     else if (status != APR_EAGAIN) {
+        /* we have DATA to send */
         if (!stream->has_response) {
-            int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
+            /* but no response */
             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03466)
-                          "h2_stream(%ld-%d): no response, RST_STREAM, err=%d",
-                          session->id, stream->id, err);
-            nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
-                                      stream->id, err);
+                          H2_STREAM_MSG(stream, "no response, RST_STREAM"));
+            h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR);
             return APR_SUCCESS;
         } 
         rv = nghttp2_session_resume_data(session->ngh2, stream->id);
         session->have_written = 1;
         ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)?
-                      APLOG_ERR : APLOG_DEBUG, 0, session->c,
-                      APLOGNO(02936) 
-                      "h2_stream(%ld-%d): resuming %s",
-                      session->id, stream->id, rv? nghttp2_strerror(rv) : "");
+                      APLOG_ERR : APLOG_DEBUG, 0, session->c, APLOGNO(02936) 
+                      H2_STREAM_MSG(stream, "resumed"));
     }
     return status;
 }
@@ -1680,6 +1527,7 @@ static const char *StateNames[] = {
     "IDLE",      /* H2_SESSION_ST_IDLE */
     "BUSY",      /* H2_SESSION_ST_BUSY */
     "WAIT",      /* H2_SESSION_ST_WAIT */
+    "CLEANUP",   /* H2_SESSION_ST_CLEANUP */
 };
 
 static const char *state_name(h2_session_state state)
@@ -1861,18 +1709,6 @@ static void h2_session_ev_no_io(h2_sessi
     }
 }
 
-static void h2_session_ev_stream_ready(h2_session *session, int arg, const char *msg)
-{
-    switch (session->state) {
-        case H2_SESSION_ST_WAIT:
-            transit(session, "stream ready", H2_SESSION_ST_BUSY);
-            break;
-        default:
-            /* nop */
-            break;
-    }
-}
-
 static void h2_session_ev_data_read(h2_session *session, int arg, const char *msg)
 {
     switch (session->state) {
@@ -1915,9 +1751,8 @@ static void h2_session_ev_pre_close(h2_s
     h2_session_shutdown(session, arg, msg, 1);
 }
 
-static void h2_session_ev_stream_open(h2_session *session, int arg, const char *msg)
+static void ev_stream_open(h2_session *session, h2_stream *stream)
 {
-    ++session->open_streams;
     switch (session->state) {
         case H2_SESSION_ST_IDLE:
             if (session->open_streams == 1) {
@@ -1928,11 +1763,29 @@ static void h2_session_ev_stream_open(h2
         default:
             break;
     }
+    
+    ap_assert(!stream->scheduled);
+    if (stream->request) {
+        const h2_request *r = stream->request;
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+                      H2_STREAM_MSG(stream, "schedule %s %s://%s%s chunked=%d"),
+                      r->method, r->scheme, r->authority, r->path, r->chunked);
+        stream->scheduled = 1;
+        h2_mplx_process(session->mplx, stream, stream_pri_cmp, session);
+    }
+    else {
+        h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
+    }
 }
 
-static void h2_session_ev_stream_done(h2_session *session, int arg, const char *msg)
+static void ev_stream_closed(h2_session *session, h2_stream *stream)
 {
-    --session->open_streams;
+    apr_bucket *b;
+    
+    if (H2_STREAM_CLIENT_INITIATED(stream->id)
+        && (stream->id > session->local.completed_max)) {
+        session->local.completed_max = stream->id;
+    }
     switch (session->state) {
         case H2_SESSION_ST_IDLE:
             if (session->open_streams == 0) {
@@ -1944,6 +1797,76 @@ static void h2_session_ev_stream_done(h2
         default:
             break;
     }
+    
+    /* The stream might have data in the buffers of the main connection.
+     * We can only free the allocated resources once all had been written.
+     * Send a special buckets on the connection that gets destroyed when
+     * all preceding data has been handled. On its destruction, it is safe
+     * to purge all resources of the stream. */
+    b = h2_bucket_eos_create(session->c->bucket_alloc, stream);
+    APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
+    h2_conn_io_pass(&session->io, session->bbtmp);
+    apr_brigade_cleanup(session->bbtmp);
+}
+
+static void on_stream_state_enter(void *ctx, h2_stream *stream)
+{
+    h2_session *session = ctx;
+    /* stream entered a new state */
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+                  H2_STREAM_MSG(stream, "entered state"));
+    switch (stream->state) {
+        case H2_SS_IDLE: /* stream was created */
+            ++session->open_streams;
+            if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
+                ++session->remote.emitted_count;
+                if (stream->id > session->remote.emitted_max) {
+                    session->remote.emitted_max = stream->id;
+                    session->local.accepted_max = stream->id;
+                }
+            }
+            else {
+                if (stream->id > session->local.emitted_max) {
+                    ++session->local.emitted_count;
+                    session->remote.emitted_max = stream->id;
+                }
+            }
+            break;
+        case H2_SS_OPEN: /* stream has request headers */
+        case H2_SS_RSVD_L: /* stream has request headers */
+            ev_stream_open(session, stream);
+            break;
+        case H2_SS_CLOSED_L: /* stream output was closed */
+            break;
+        case H2_SS_CLOSED_R: /* stream input was closed */
+            break;
+        case H2_SS_CLOSED: /* stream in+out were closed */
+            --session->open_streams;
+            ev_stream_closed(session, stream);
+            break;
+        case H2_SS_CLEANUP:
+            h2_mplx_stream_cleanup(session->mplx, stream);
+            break;
+        default:
+            break;
+    }
+}
+
+static void on_stream_state_event(void *ctx, h2_stream *stream, 
+                                  h2_stream_event_t ev)
+{
+    h2_session *session = ctx;
+    switch (ev) {
+        case H2_SEV_CANCELLED:
+            if (session->state != H2_SESSION_ST_DONE) {
+                nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, 
+                                          stream->id, stream->rst_error);
+            }
+            break;
+        default:
+            /* NOP */
+            break;
+    }
 }
 
 static void dispatch_event(h2_session *session, h2_session_event_t ev, 
@@ -1971,9 +1894,6 @@ static void dispatch_event(h2_session *s
         case H2_SESSION_EV_NO_IO:
             h2_session_ev_no_io(session, arg, msg);
             break;
-        case H2_SESSION_EV_STREAM_READY:
-            h2_session_ev_stream_ready(session, arg, msg);
-            break;
         case H2_SESSION_EV_DATA_READ:
             h2_session_ev_data_read(session, arg, msg);
             break;
@@ -1986,23 +1906,12 @@ static void dispatch_event(h2_session *s
         case H2_SESSION_EV_PRE_CLOSE:
             h2_session_ev_pre_close(session, arg, msg);
             break;
-        case H2_SESSION_EV_STREAM_OPEN:
-            h2_session_ev_stream_open(session, arg, msg);
-            break;
-        case H2_SESSION_EV_STREAM_DONE:
-            h2_session_ev_stream_done(session, arg, msg);
-            break;
         default:
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
                           "h2_session(%ld): unknown event %d", 
                           session->id, ev);
             break;
     }
-    
-    if (session->state == H2_SESSION_ST_DONE) {
-        apr_brigade_cleanup(session->bbtmp);
-        h2_mplx_abort(session->mplx);
-    }
 }
 
 /* trigger window updates, stream resumes and submits */

Modified: httpd/httpd/trunk/modules/http2/h2_session.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_session.h?rev=1782875&r1=1782874&r2=1782875&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_session.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_session.h Mon Feb 13 21:00:30 2017
@@ -51,6 +51,7 @@ struct h2_push;
 struct h2_push_diary;
 struct h2_session;
 struct h2_stream;
+struct h2_stream_monitor;
 struct h2_task;
 struct h2_workers;
 
@@ -64,13 +65,10 @@ typedef enum {
     H2_SESSION_EV_PROTO_ERROR,      /* protocol error */
     H2_SESSION_EV_CONN_TIMEOUT,     /* connection timeout */
     H2_SESSION_EV_NO_IO,            /* nothing has been read or written */
-    H2_SESSION_EV_STREAM_READY,     /* stream signalled availability of headers/data */
     H2_SESSION_EV_DATA_READ,        /* connection data has been read */
     H2_SESSION_EV_NGH2_DONE,        /* nghttp2 wants neither read nor write anything */
     H2_SESSION_EV_MPM_STOPPING,     /* the process is stopping */
     H2_SESSION_EV_PRE_CLOSE,        /* connection will close after this */
-    H2_SESSION_EV_STREAM_OPEN,      /* stream has been opened */
-    H2_SESSION_EV_STREAM_DONE,      /* stream has been handled completely */
 } h2_session_event_t;
 
 typedef struct h2_session {
@@ -101,6 +99,7 @@ typedef struct h2_session {
     
     struct h2_push_diary *push_diary; /* remember pushes, avoid duplicates */
     
+    struct h2_stream_monitor *monitor;/* monitor callbacks for streams */
     int open_streams;               /* number of client streams open */
     int unsent_submits;             /* number of submitted, but not yet written responses. */
     int unsent_promises;            /* number of submitted, but not yet written push promises */
@@ -178,34 +177,12 @@ void h2_session_abort(h2_session *sessio
 void h2_session_close(h2_session *session);
 
 /**
- * Create and register a new stream under the given id.
- * 
- * @param session the session to register in
- * @param stream_id the new stream identifier
- * @param initiated_on the stream id this one is initiated on or 0
- * @param req the request for this stream or NULL if not known yet
- * @return the new stream
- */
-struct h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
-                                         int initiated_on, 
-                                         const h2_request *req);
-
-
-/**
  * Returns if client settings have push enabled.
  * @param != 0 iff push is enabled in client settings
  */
 int h2_session_push_enabled(h2_session *session);
 
 /**
- * Destroy the stream and release it everywhere. Reclaim all resources.
- * @param session the session to which the stream belongs
- * @param stream the stream to destroy
- */
-apr_status_t h2_session_stream_done(h2_session *session, 
-                                    struct h2_stream *stream);
-
-/**
  * Submit a push promise on the stream and schedule the new steam for
  * processing..
  * 
@@ -221,5 +198,4 @@ apr_status_t h2_session_set_prio(h2_sess
                                  struct h2_stream *stream, 
                                  const struct h2_priority *prio);
 
-
 #endif /* defined(__mod_h2__h2_session__) */

Modified: httpd/httpd/trunk/modules/http2/h2_stream.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_stream.c?rev=1782875&r1=1782874&r2=1782875&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_stream.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_stream.c Mon Feb 13 21:00:30 2017
@@ -43,16 +43,119 @@
 #include "h2_util.h"
 
 
-static int state_transition[][7] = {
-    /*  ID OP RL RR CI CO CL */
-/*ID*/{  1, 0, 0, 0, 0, 0, 0 },
-/*OP*/{  1, 1, 0, 0, 0, 0, 0 },
-/*RL*/{  0, 0, 1, 0, 0, 0, 0 },
-/*RR*/{  0, 0, 0, 1, 0, 0, 0 },
-/*CI*/{  1, 1, 0, 0, 1, 0, 0 },
-/*CO*/{  1, 1, 0, 0, 0, 1, 0 },
-/*CL*/{  1, 1, 0, 0, 1, 1, 1 },
+#define S_XXX (-2)
+#define S_ERR (-1)
+#define S_NOP (0)
+#define S_IDL     (H2_SS_IDL + 1)
+#define S_RS_L    (H2_SS_RSVD_L + 1)
+#define S_RS_R    (H2_SS_RSVD_R + 1)
+#define S_OPEN    (H2_SS_OPEN + 1)
+#define S_CL_L    (H2_SS_CLOSED_L + 1)
+#define S_CL_R    (H2_SS_CLOSED_R + 1)
+#define S_CLS     (H2_SS_CLOSED + 1)
+#define S_CLN     (H2_SS_CLEANUP + 1)
+
+static const char *h2_ss_str(h2_stream_state_t state)
+{
+    switch (state) {
+        case H2_SS_IDLE:
+            return "IDLE";
+        case H2_SS_RSVD_L:
+            return "RESERVED_LOCAL";
+        case H2_SS_RSVD_R:
+            return "RESERVED_REMOTE";
+        case H2_SS_OPEN:
+            return "OPEN";
+        case H2_SS_CLOSED_L:
+            return "HALF_CLOSED_LOCAL";
+        case H2_SS_CLOSED_R:
+            return "HALF_CLOSED_REMOTE";
+        case H2_SS_CLOSED:
+            return "CLOSED";
+        case H2_SS_CLEANUP:
+            return "CLEANUP";
+        default:
+            return "UNKNOWN";
+    }
+}
+
+const char *h2_stream_state_str(h2_stream *stream) 
+{
+    return h2_ss_str(stream->state);
+}
+
+static int trans_on_send[][H2_SS_MAX] = {
+/*                    S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS,  S_CLN, */        
+/* DATA,         */ { S_ERR, S_ERR,  S_ERR,  S_NOP,  S_NOP,  S_ERR,  S_NOP,  S_NOP, },
+/* HEADERS,      */ { S_ERR, S_ERR,  S_CL_R, S_NOP,  S_NOP,  S_ERR,  S_NOP,  S_NOP, },
+/* PRIORITY,     */ { S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, },   
+/* RST_STREAM,   */ { S_CLS, S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_NOP,  S_NOP, },
+/* SETTINGS,     */ { S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },
+/* PUSH_PROMISE, */ { S_RS_L,S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, }, 
+/* PING,         */ { S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },
+/* GOAWAY,       */ { S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },
+/* WINDOW_UPDATE,*/ { S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, },
+/* CONT          */ { S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, },
+};
+static int trans_on_recv[][H2_SS_MAX] = {
+/*                    S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS,  S_CLN, */        
+/* DATA,         */ { S_ERR, S_ERR,  S_ERR,  S_NOP,  S_ERR,  S_NOP,  S_NOP,  S_NOP, },
+/* HEADERS,      */ { S_OPEN,S_CL_L, S_ERR,  S_NOP,  S_ERR,  S_NOP,  S_NOP,  S_NOP, },
+/* PRIORITY,     */ { S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, },   
+/* RST_STREAM,   */ { S_ERR, S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_NOP,  S_NOP, },
+/* SETTINGS,     */ { S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },
+/* PUSH_PROMISE, */ { S_RS_R,S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, }, 
+/* PING,         */ { S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },
+/* GOAWAY,       */ { S_ERR, S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR,  S_ERR, },
+/* WINDOW_UPDATE,*/ { S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, },
+/* CONT          */ { S_NOP, S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP,  S_NOP, },
 };
+static int trans_on_event[][H2_SS_MAX] = {
+/* H2_SEV_CLOSED_L*/{ S_XXX, S_ERR,  S_ERR,  S_CL_L, S_CLS,  S_XXX,  S_XXX,  S_XXX, },
+/* H2_SEV_CLOSED_R*/{ S_ERR, S_ERR,  S_ERR,  S_CL_R, S_ERR,  S_CLS,  S_NOP,  S_NOP, },
+/* H2_SEV_CANCELLED*/{S_CLS, S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_CLS,  S_NOP,  S_NOP, },
+/* H2_SEV_EOS_SENT*/{ S_NOP, S_XXX,  S_XXX,  S_XXX,  S_XXX,  S_CLS,  S_CLN,  S_XXX, },
+};
+
+static int on_map(h2_stream_state_t state, int map[H2_SS_MAX])
+{
+    int op = map[state];
+    switch (op) {
+        case S_XXX:
+        case S_ERR:
+            return op;
+        case S_NOP:
+            return state;
+        default:
+            return op-1;
+    }
+}
+
+static int on_frame(h2_stream_state_t state, int frame_type, 
+                    int frame_map[][H2_SS_MAX], apr_size_t maxlen)
+{
+    ap_assert(frame_type >= 0);
+    ap_assert(state >= 0);
+    if (frame_type >= maxlen) {
+        return state; /* NOP */
+    }
+    return on_map(state, frame_map[frame_type]);
+}
+
+static int on_frame_send(h2_stream_state_t state, int frame_type)
+{
+    return on_frame(state, frame_type, trans_on_send, H2_ALEN(trans_on_send));
+}
+
+static int on_frame_recv(h2_stream_state_t state, int frame_type)
+{
+    return on_frame(state, frame_type, trans_on_recv, H2_ALEN(trans_on_recv));
+}
+
+static int on_event(h2_stream_state_t state, h2_stream_event_t ev)
+{
+    return on_map(state, trans_on_event[ev]);
+}
 
 static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, const char *tag)
 {
@@ -68,87 +171,291 @@ static void H2_STREAM_OUT_LOG(int lvl, h
     }
 }
 
-static int set_state(h2_stream *stream, h2_stream_state_t state)
+static apr_status_t close_input(h2_stream *stream)
 {
-    int allowed = state_transition[state][stream->state];
-    if (allowed) {
-        stream->state = state;
-        return 1;
+    conn_rec *c = stream->session->c;
+    apr_status_t status;
+    apr_bucket_brigade *tmp;
+    apr_bucket *b;
+
+    if (h2_beam_is_closed(stream->input)) {
+        return APR_SUCCESS;
     }
     
-    ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c, APLOGNO(03081)
-                  "h2_stream(%ld-%d): invalid state transition from %d to %d", 
-                  stream->session->id, stream->id, stream->state, state);
-    return 0;
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+                  H2_STREAM_MSG(stream, "closing input"));
+    if (stream->rst_error) {
+        return APR_ECONNRESET;
+    }
+    
+    tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
+    if (stream->trailers && !apr_is_empty_table(stream->trailers)) {
+        h2_headers *r = h2_headers_create(HTTP_OK, stream->trailers, 
+                                          NULL, stream->pool);
+        b = h2_bucket_headers_create(c->bucket_alloc, r);
+        APR_BRIGADE_INSERT_TAIL(tmp, b);
+        stream->trailers = NULL;
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
+                      H2_STREAM_MSG(stream, "added trailers"));
+    }
+    
+    b = apr_bucket_eos_create(c->bucket_alloc);
+    APR_BRIGADE_INSERT_TAIL(tmp, b);
+    status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
+    apr_brigade_destroy(tmp);
+    h2_beam_close(stream->input);
+    return status;
+}
+
+static apr_status_t close_output(h2_stream *stream)
+{
+    if (h2_beam_is_closed(stream->output)) {
+        return APR_SUCCESS;
+    }
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+                  H2_STREAM_MSG(stream, "closing output"));
+    return h2_beam_leave(stream->output);
+}
+
+static void on_state_enter(h2_stream *stream) 
+{
+    if (stream->monitor && stream->monitor->on_state_enter) {
+        stream->monitor->on_state_enter(stream->monitor->ctx, stream);
+    }
+}
+
+static void on_state_event(h2_stream *stream, h2_stream_event_t ev) 
+{
+    if (stream->monitor && stream->monitor->on_state_event) {
+        stream->monitor->on_state_event(stream->monitor->ctx, stream, ev);
+    }
 }
 
-static int close_input(h2_stream *stream) 
+static void on_state_invalid(h2_stream *stream) 
 {
+    if (stream->monitor && stream->monitor->on_state_invalid) {
+        stream->monitor->on_state_invalid(stream->monitor->ctx, stream);
+    }
+    /* stream got an event/frame invalid in its state */
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+                  H2_STREAM_MSG(stream, "invalid state event")); 
     switch (stream->state) {
-        case H2_STREAM_ST_CLOSED_INPUT:
-        case H2_STREAM_ST_CLOSED:
-            return 0; /* ignore, idempotent */
-        case H2_STREAM_ST_CLOSED_OUTPUT:
-            /* both closed now */
-            set_state(stream, H2_STREAM_ST_CLOSED);
+        case H2_SS_OPEN:
+        case H2_SS_RSVD_L:
+        case H2_SS_RSVD_R:
+        case H2_SS_CLOSED_L:
+        case H2_SS_CLOSED_R:
+            h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
             break;
         default:
-            /* everything else we jump to here */
-            set_state(stream, H2_STREAM_ST_CLOSED_INPUT);
             break;
     }
-    return 1;
 }
 
-static int input_closed(h2_stream *stream) 
+static apr_status_t transit(h2_stream *stream, int new_state)
 {
-    switch (stream->state) {
-        case H2_STREAM_ST_OPEN:
-        case H2_STREAM_ST_CLOSED_OUTPUT:
-            return 0;
-        default:
-            return 1;
+    if (new_state == stream->state) {
+        return APR_SUCCESS;
     }
+    else if (new_state < 0) {
+        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c, APLOGNO(03081)
+                      H2_STREAM_MSG(stream, "invalid transition"));
+        on_state_invalid(stream);
+        return APR_EINVAL;
+    }
+    
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, 
+                  H2_STREAM_MSG(stream, "transit to [%s]"), h2_ss_str(new_state));
+    stream->state = new_state;
+    switch (new_state) {
+        case H2_SS_IDLE:
+            break;
+        case H2_SS_RSVD_L:
+            close_input(stream);
+            break;
+        case H2_SS_RSVD_R:
+            break;
+        case H2_SS_OPEN:
+            break;
+        case H2_SS_CLOSED_L:
+            close_output(stream);
+            break;
+        case H2_SS_CLOSED_R:
+            close_input(stream);
+            break;
+        case H2_SS_CLOSED:
+            close_input(stream);
+            close_output(stream);
+            if (stream->out_buffer) {
+                apr_brigade_cleanup(stream->out_buffer);
+            }
+            break;
+        case H2_SS_CLEANUP:
+            break;
+    }
+    on_state_enter(stream);
+    return APR_SUCCESS;
 }
 
-static int close_output(h2_stream *stream) 
+void h2_stream_set_monitor(h2_stream *stream, h2_stream_monitor *monitor)
 {
-    switch (stream->state) {
-        case H2_STREAM_ST_CLOSED_OUTPUT:
-        case H2_STREAM_ST_CLOSED:
-            return 0; /* ignore, idempotent */
-        case H2_STREAM_ST_CLOSED_INPUT:
-            /* both closed now */
-            set_state(stream, H2_STREAM_ST_CLOSED);
+    stream->monitor = monitor;
+}
+
+void h2_stream_dispatch(h2_stream *stream, h2_stream_event_t ev)
+{
+    int new_state;
+    
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
+                  H2_STREAM_MSG(stream, "dispatch event %d"), ev);
+    new_state = on_event(stream->state, ev);
+    if (new_state < 0) {
+        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c, APLOGNO(03081)
+                      H2_STREAM_MSG(stream, "invalid event %d"), ev);
+        on_state_invalid(stream);
+        AP_DEBUG_ASSERT(new_state > S_XXX);
+        return;
+    }
+    else if (new_state == stream->state) {
+        /* nop */
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
+                      H2_STREAM_MSG(stream, "ignored event %d"), ev);
+        return;
+    }
+    else {
+        on_state_event(stream, ev);
+        transit(stream, new_state);
+    }
+}
+
+static void set_policy_for(h2_stream *stream, h2_request *r) 
+{
+    int enabled = h2_session_push_enabled(stream->session);
+    stream->push_policy = h2_push_policy_determine(r->headers, stream->pool, 
+                                                   enabled);
+    r->serialize = h2_config_geti(stream->session->config, H2_CONF_SER_HEADERS);
+}
+
+apr_status_t h2_stream_send_frame(h2_stream *stream, int ftype, int flags)
+{
+    apr_status_t status = APR_SUCCESS;
+    int new_state, eos = 0;
+
+    new_state = on_frame_send(stream->state, ftype);
+    if (new_state < 0) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, 
+                      H2_STREAM_MSG(stream, "invalid frame %d send"), ftype);
+        AP_DEBUG_ASSERT(new_state > S_XXX);
+        return transit(stream, new_state);
+    }
+    
+    switch (ftype) {
+        case NGHTTP2_DATA:
+            eos = (flags & NGHTTP2_FLAG_END_STREAM);
+            break;
+            
+        case NGHTTP2_HEADERS:
+            eos = (flags & NGHTTP2_FLAG_END_STREAM);
+            break;
+            
+        case NGHTTP2_PUSH_PROMISE:
+                /* start pushed stream */
+                ap_assert(stream->request == NULL);
+                ap_assert(stream->rtmp != NULL);
+                status = h2_request_end_headers(stream->rtmp, stream->pool, 0);
+                if (status != APR_SUCCESS) {
+                    return status;
+                }
+                set_policy_for(stream, stream->rtmp);
+                stream->request = stream->rtmp;
+                stream->rtmp = NULL;
             break;
+            
         default:
-            /* everything else we jump to here */
-            set_state(stream, H2_STREAM_ST_CLOSED_OUTPUT);
             break;
     }
-    return 1;
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, 
+                  H2_STREAM_MSG(stream, "send frame %d, eos=%d"), ftype, eos);
+    status = transit(stream, new_state);
+    if (status == APR_SUCCESS && eos) {
+        status = transit(stream, on_event(stream->state, H2_SEV_CLOSED_L));
+    }
+    return status;
 }
 
-static int input_open(const h2_stream *stream) 
+apr_status_t h2_stream_recv_frame(h2_stream *stream, int ftype, int flags)
 {
-    switch (stream->state) {
-        case H2_STREAM_ST_OPEN:
-        case H2_STREAM_ST_CLOSED_OUTPUT:
-            return 1;
+    apr_status_t status = APR_SUCCESS;
+    int new_state, eos = 0;
+
+    new_state = on_frame_recv(stream->state, ftype);
+    if (new_state < 0) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, 
+                      H2_STREAM_MSG(stream, "invalid frame %d recv"), ftype);
+        AP_DEBUG_ASSERT(new_state > S_XXX);
+        return transit(stream, new_state);
+    }
+    
+    switch (ftype) {
+        case NGHTTP2_DATA:
+            eos = (flags & NGHTTP2_FLAG_END_STREAM);
+            break;
+            
+        case NGHTTP2_HEADERS:
+            eos = (flags & NGHTTP2_FLAG_END_STREAM);
+            if (stream->state == H2_SS_OPEN) {
+                /* trailer HEADER */
+                if (!eos) {
+                    h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR);
+                }
+            }
+            else {
+                /* request HEADER */
+                ap_assert(stream->request == NULL);
+                ap_assert(stream->rtmp != NULL);
+                status = h2_request_end_headers(stream->rtmp, stream->pool, 0);
+                if (status != APR_SUCCESS) {
+                    return status;
+                }
+                set_policy_for(stream, stream->rtmp);
+                stream->request = stream->rtmp;
+                stream->rtmp = NULL;
+            }
+            break;
+            
         default:
-            return 0;
+            break;
+    }
+    status = transit(stream, new_state);
+    if (status == APR_SUCCESS && eos) {
+        status = transit(stream, on_event(stream->state, H2_SEV_CLOSED_R));
     }
+    return status;
 }
 
-static int output_open(h2_stream *stream) 
+apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags,
+                                    const uint8_t *data, size_t len)
 {
-    switch (stream->state) {
-        case H2_STREAM_ST_OPEN:
-        case H2_STREAM_ST_CLOSED_INPUT:
-            return 1;
-        default:
-            return 0;
+    h2_session *session = stream->session;
+    apr_status_t status = APR_SUCCESS;
+    apr_bucket_brigade *tmp;
+    
+    ap_assert(stream);
+    if (!stream->input) {
+        return APR_EOF;
     }
+
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
+                  H2_STREAM_MSG(stream, "recv DATA, len=%d"), (int)len);
+    
+    tmp = apr_brigade_create(stream->pool, session->c->bucket_alloc);
+    apr_brigade_write(tmp, NULL, NULL, (const char *)data, len);
+    status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
+    apr_brigade_destroy(tmp);
+    
+    stream->in_data_frames++;
+    stream->in_data_octets += len;
+    return status;
 }
 
 static void prep_output(h2_stream *stream) {
@@ -158,35 +465,26 @@ static void prep_output(h2_stream *strea
     }
 }
 
-static void prepend_response(h2_stream *stream, h2_headers *response)
-{
-    conn_rec *c = stream->session->c;
-    apr_bucket *b;
-    
-    prep_output(stream);
-    b = h2_bucket_headers_create(c->bucket_alloc, response);
-    APR_BRIGADE_INSERT_HEAD(stream->out_buffer, b);
-}
-
-h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session,
-                          int initiated_on)
+h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session,
+                            h2_stream_monitor *monitor, int initiated_on)
 {
     h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
     
     stream->id           = id;
     stream->initiated_on = initiated_on;
     stream->created      = apr_time_now();
-    stream->state        = H2_STREAM_ST_IDLE;
+    stream->state        = H2_SS_IDLE;
     stream->pool         = pool;
     stream->session      = session;
+    stream->monitor      = monitor;
     
     h2_beam_create(&stream->input, pool, id, "input", H2_BEAM_OWNER_SEND, 0);
     h2_beam_send_from(stream->input, stream->pool);
     h2_beam_create(&stream->output, pool, id, "output", H2_BEAM_OWNER_RECV, 0);
     
-    set_state(stream, H2_STREAM_ST_OPEN);
     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03082)
-                  "h2_stream(%ld-%d): opened", session->id, stream->id);
+                  H2_STREAM_MSG(stream, "created"));
+    on_state_enter(stream);
     return stream;
 }
 
@@ -204,12 +502,10 @@ void h2_stream_cleanup(h2_stream *stream
     status = h2_beam_wait_empty(stream->input, APR_NONBLOCK_READ);
     if (status == APR_EAGAIN) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, 
-                      "h2_stream(%ld-%d): wait on input drain", 
-                      stream->session->id, stream->id);
+                      H2_STREAM_MSG(stream, "wait on input drain"));
         status = h2_beam_wait_empty(stream->input, APR_BLOCK_READ);
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c, 
-                      "h2_stream(%ld-%d): input drain returned", 
-                      stream->session->id, stream->id);
+                      H2_STREAM_MSG(stream, "input drain returned"));
     }
 }
 
@@ -217,20 +513,13 @@ void h2_stream_destroy(h2_stream *stream
 {
     ap_assert(stream);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, stream->session->c, 
-                  "h2_stream(%ld-%d): destroy", 
-                  stream->session->id, stream->id);
+                  H2_STREAM_MSG(stream, "destroy"));
     if (stream->pool) {
         apr_pool_destroy(stream->pool);
         stream->pool = NULL;
     }
 }
 
-void h2_stream_eos_destroy(h2_stream *stream)
-{
-    h2_session_stream_done(stream->session, stream);
-    /* stream possibly destroyed */
-}
-
 apr_pool_t *h2_stream_detach_pool(h2_stream *stream)
 {
     apr_pool_t *pool = stream->pool;
@@ -241,17 +530,15 @@ apr_pool_t *h2_stream_detach_pool(h2_str
 void h2_stream_rst(h2_stream *stream, int error_code)
 {
     stream->rst_error = error_code;
-    close_input(stream);
-    close_output(stream);
-    if (stream->out_buffer) {
-        apr_brigade_cleanup(stream->out_buffer);
-    }
+    h2_beam_abort(stream->input);
+    h2_beam_leave(stream->output);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
-                  "h2_stream(%ld-%d): reset, error=%d", 
-                  stream->session->id, stream->id, error_code);
+                  H2_STREAM_MSG(stream, "reset, error=%d"), error_code);
+    h2_stream_dispatch(stream, H2_SEV_CANCELLED);
 }
 
-apr_status_t h2_stream_set_request_rec(h2_stream *stream, request_rec *r)
+apr_status_t h2_stream_set_request_rec(h2_stream *stream, 
+                                       request_rec *r, int eos)
 {
     h2_request *req;
     apr_status_t status;
@@ -264,20 +551,37 @@ apr_status_t h2_stream_set_request_rec(h
     status = h2_request_rcreate(&req, stream->pool, r);
     if (status == APR_SUCCESS) {
         ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO(03058)
-                      "h2_request(%d): set_request_rec %s host=%s://%s%s",
-                      stream->id, req->method, req->scheme, req->authority, 
-                      req->path);
+                      H2_STREAM_MSG(stream, "set_request_rec %s host=%s://%s%s"),
+                      req->method, req->scheme, req->authority, req->path);
         stream->rtmp = req;
+        /* simulate the frames that led to this */
+        return h2_stream_recv_frame(stream, NGHTTP2_HEADERS, 
+                                    NGHTTP2_FLAG_END_STREAM);
     }
     return status;
 }
 
-apr_status_t h2_stream_set_request(h2_stream *stream, const h2_request *r)
+void h2_stream_set_request(h2_stream *stream, const h2_request *r)
 {
     ap_assert(stream->request == NULL);
     ap_assert(stream->rtmp == NULL);
     stream->rtmp = h2_request_clone(stream->pool, r);
-    return APR_SUCCESS;
+}
+
+static void set_error_response(h2_stream *stream, int http_status)
+{
+    if (!h2_stream_is_ready(stream)) {
+        conn_rec *c = stream->session->c;
+        apr_bucket *b;
+        h2_headers *response;
+        
+        response = h2_headers_die(http_status, stream->request, stream->pool);
+        prep_output(stream);
+        b = apr_bucket_eos_create(c->bucket_alloc);
+        APR_BRIGADE_INSERT_HEAD(stream->out_buffer, b);
+        b = h2_bucket_headers_create(c->bucket_alloc, response);
+        APR_BRIGADE_INSERT_HEAD(stream->out_buffer, b);
+    }
 }
 
 static apr_status_t add_trailer(h2_stream *stream,
@@ -289,8 +593,7 @@ static apr_status_t add_trailer(h2_strea
 
     if (nlen == 0 || name[0] == ':') {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, APR_EINVAL, c, APLOGNO(03060)
-                      "h2_request(%ld-%d): pseudo header in trailer",
-                      c->id, stream->id);
+                      H2_STREAM_MSG(stream, "pseudo header in trailer"));
         return APR_EINVAL;
     }
     if (h2_req_ignore_trailer(name, nlen)) {
@@ -311,192 +614,62 @@ apr_status_t h2_stream_add_header(h2_str
                                   const char *name, size_t nlen,
                                   const char *value, size_t vlen)
 {
+    h2_session *session = stream->session;
     int error = 0;
-    ap_assert(stream);
+    apr_status_t status;
     
     if (stream->has_response) {
         return APR_EINVAL;    
     }
     ++stream->request_headers_added;
     if (name[0] == ':') {
-        if ((vlen) > stream->session->s->limit_req_line) {
+        if ((vlen) > session->s->limit_req_line) {
             /* pseudo header: approximation of request line size check */
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
-                          "h2_stream(%ld-%d): pseudo header %s too long", 
-                          stream->session->id, stream->id, name);
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+                          H2_STREAM_MSG(stream, "pseudo %s too long"), name);
             error = HTTP_REQUEST_URI_TOO_LARGE;
         }
     }
-    else if ((nlen + 2 + vlen) > stream->session->s->limit_req_fieldsize) {
+    else if ((nlen + 2 + vlen) > session->s->limit_req_fieldsize) {
         /* header too long */
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
-                      "h2_stream(%ld-%d): header %s too long", 
-                      stream->session->id, stream->id, name);
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+                      H2_STREAM_MSG(stream, "header %s too long"), name);
         error = HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
     }
     
-    if (stream->request_headers_added 
-        > stream->session->s->limit_req_fields + 4) {
+    if (stream->request_headers_added > session->s->limit_req_fields + 4) {
         /* too many header lines, include 4 pseudo headers */
         if (stream->request_headers_added 
-            > stream->session->s->limit_req_fields + 4 + 100) {
+            > session->s->limit_req_fields + 4 + 100) {
             /* yeah, right */
+            h2_stream_rst(stream, H2_ERR_ENHANCE_YOUR_CALM);
             return APR_ECONNRESET;
         }
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
-                      "h2_stream(%ld-%d): too many header lines", 
-                      stream->session->id, stream->id);
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+                      H2_STREAM_MSG(stream, "too many header lines")); 
         error = HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
     }
     
-    if (h2_stream_is_scheduled(stream)) {
-        return add_trailer(stream, name, nlen, value, vlen);
+    if (error) {
+        set_error_response(stream, error);
+        return APR_EINVAL; 
     }
-    else if (error) {
-        return h2_stream_set_error(stream, error); 
-    }
-    else {
+    else if (H2_SS_IDLE == stream->state) {
         if (!stream->rtmp) {
             stream->rtmp = h2_req_create(stream->id, stream->pool, 
                                          NULL, NULL, NULL, NULL, NULL, 0);
         }
-        if (stream->state != H2_STREAM_ST_OPEN) {
-            return APR_ECONNRESET;
-        }
-        return h2_request_add_header(stream->rtmp, stream->pool,
-                                     name, nlen, value, vlen);
-    }
-}
-
-apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled, 
-                                h2_stream_pri_cmp *cmp, void *ctx)
-{
-    apr_status_t status = APR_EINVAL;
-    ap_assert(stream);
-    ap_assert(stream->session);
-    ap_assert(stream->session->mplx);
-    
-    if (!stream->scheduled) {
-        if (eos) {
-            close_input(stream);
-        }
-
-        if (h2_stream_is_ready(stream)) {
-            /* already have a resonse, probably a HTTP error code */
-            return h2_mplx_process(stream->session->mplx, stream, cmp, ctx);
-        }
-        else if (!stream->request && stream->rtmp) {
-            /* This is the common case: a h2_request was being assembled, now
-             * it gets finalized and checked for completness */
-            status = h2_request_end_headers(stream->rtmp, stream->pool, eos);
-            if (status == APR_SUCCESS) {
-                stream->rtmp->serialize = h2_config_geti(stream->session->config,
-                                                         H2_CONF_SER_HEADERS); 
-
-                stream->request = stream->rtmp;
-                stream->rtmp = NULL;
-                stream->scheduled = 1;
-
-                stream->push_policy = h2_push_policy_determine(stream->request->headers, 
-                                                               stream->pool, push_enabled);
-            
-                
-                status = h2_mplx_process(stream->session->mplx, stream, cmp, ctx);
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
-                              "h2_stream(%ld-%d): scheduled %s %s://%s%s "
-                              "chunked=%d",
-                              stream->session->id, stream->id,
-                              stream->request->method, stream->request->scheme,
-                              stream->request->authority, stream->request->path,
-                              stream->request->chunked);
-                return status;
-            }
-        }
-        else {
-            status = APR_ECONNRESET;
-        }
-    }
-    
-    h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
-                  "h2_stream(%ld-%d): RST=2 (internal err) %s %s://%s%s",
-                  stream->session->id, stream->id,
-                  stream->request->method, stream->request->scheme,
-                  stream->request->authority, stream->request->path);
-    return status;
-}
-
-int h2_stream_is_scheduled(const h2_stream *stream)
-{
-    return stream->scheduled;
-}
-
-apr_status_t h2_stream_close_input(h2_stream *stream)
-{
-    conn_rec *c = stream->session->c;
-    apr_status_t status;
-    apr_bucket_brigade *tmp;
-    apr_bucket *b;
-
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
-                  "h2_stream(%ld-%d): closing input",
-                  stream->session->id, stream->id);
-    if (stream->rst_error) {
-        return APR_ECONNRESET;
-    }
-    
-    tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
-    if (stream->trailers && !apr_is_empty_table(stream->trailers)) {
-        h2_headers *r = h2_headers_create(HTTP_OK, stream->trailers, 
-                                          NULL, stream->pool);
-        b = h2_bucket_headers_create(c->bucket_alloc, r);
-        APR_BRIGADE_INSERT_TAIL(tmp, b);
-        stream->trailers = NULL;
+        status = h2_request_add_header(stream->rtmp, stream->pool,
+                                       name, nlen, value, vlen);
     }
-    
-    b = apr_bucket_eos_create(c->bucket_alloc);
-    APR_BRIGADE_INSERT_TAIL(tmp, b);
-    status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
-    apr_brigade_destroy(tmp);
-    h2_beam_close(stream->input);
-    return status;
-}
-
-apr_status_t h2_stream_write_data(h2_stream *stream,
-                                  const char *data, size_t len, int eos)
-{
-    conn_rec *c = stream->session->c;
-    apr_status_t status = APR_SUCCESS;
-    apr_bucket_brigade *tmp;
-    
-    ap_assert(stream);
-    if (!stream->input) {
-        return APR_EOF;
+    else  {
+        status = add_trailer(stream, name, nlen, value, vlen);
     }
-    if (input_closed(stream) || !stream->request) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
-                      "h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d", 
-                      stream->session->id, stream->id, input_closed(stream),
-                      stream->request != NULL);
-        return APR_EINVAL;
+    if (status != APR_SUCCESS) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+                      H2_STREAM_MSG(stream, "header %s not accepted"), name);
+        h2_stream_dispatch(stream, H2_SEV_CANCELLED);
     }
-
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
-                  "h2_stream(%ld-%d): add %ld input bytes", 
-                  stream->session->id, stream->id, (long)len);
-    
-    tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
-    apr_brigade_write(tmp, NULL, NULL, data, len);
-    status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
-    apr_brigade_destroy(tmp);
-    
-    stream->in_data_frames++;
-    stream->in_data_octets += len;
-    
-    if (eos) {
-        return h2_stream_close_input(stream);
-    }
-    
     return status;
 }
 
@@ -510,28 +683,10 @@ static apr_status_t fill_buffer(h2_strea
     status = h2_beam_receive(stream->output, stream->out_buffer, 
                              APR_NONBLOCK_READ, amount);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c,
-                  "h2_stream(%ld-%d): beam_received",
-                  stream->session->id, stream->id);
+                  H2_STREAM_MSG(stream, "beam_received"));
     return status;
 }
 
-apr_status_t h2_stream_set_error(h2_stream *stream, int http_status)
-{
-    h2_headers *response;
-    
-    if (h2_stream_is_ready(stream)) {
-        return APR_EINVAL;
-    }
-    if (stream->rtmp) {
-        stream->request = stream->rtmp;
-        stream->rtmp = NULL;
-    }
-    response = h2_headers_die(http_status, stream->request, stream->pool);
-    prepend_response(stream, response);
-    h2_beam_close(stream->output);
-    return APR_SUCCESS;
-}
-
 static apr_bucket *get_first_headers_bucket(apr_bucket_brigade *bb)
 {
     if (bb) {
@@ -563,9 +718,6 @@ apr_status_t h2_stream_out_prepare(h2_st
         *peos = 1;
         return APR_ECONNRESET;
     }
-    else if (!output_open(stream)) {
-        return APR_ECONNRESET;
-    }
     
     c = stream->session->c;
     prep_output(stream);
@@ -639,14 +791,24 @@ apr_status_t h2_stream_out_prepare(h2_st
             }
         }
     }
-    
-    if (!*peos && !*plen && status == APR_SUCCESS 
-        && (!presponse || !*presponse)) {
-        status = APR_EAGAIN;
+
+    if (status == APR_SUCCESS) {
+        if (presponse && *presponse) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
+                          H2_STREAM_MSG(stream, "prepare, response %d"), 
+                          (*presponse)->status);
+        }
+        else if (*peos || *plen) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
+                          H2_STREAM_MSG(stream, "prepare, len=%ld eos=%d"),
+                          (long)*plen, *peos);
+        }
+        else {
+            status = APR_EAGAIN;
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
+                          H2_STREAM_MSG(stream, "prepare, no data"));
+        }
     }
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
-                  "h2_stream(%ld-%d): prepare, len=%ld eos=%d",
-                  c->id, stream->id, (long)*plen, *peos);
     return status;
 }
 
@@ -669,17 +831,12 @@ apr_status_t h2_stream_read_to(h2_stream
         status = APR_EAGAIN;
     }
     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
-                  "h2_stream(%ld-%d): read_to, len=%ld eos=%d",
-                  c->id, stream->id, (long)*plen, *peos);
+                  H2_STREAM_MSG(stream, "read_to, len=%ld eos=%d"),
+                  (long)*plen, *peos);
     return status;
 }
 
 
-int h2_stream_input_is_open(const h2_stream *stream) 
-{
-    return input_open(stream);
-}
-
 apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response)
 {
     apr_status_t status = APR_SUCCESS;
@@ -689,8 +846,8 @@ apr_status_t h2_stream_submit_pushes(h2_
     pushes = h2_push_collect_update(stream, stream->request, response);
     if (pushes && !apr_is_empty_array(pushes)) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
-                      "h2_stream(%ld-%d): found %d push candidates",
-                      stream->session->id, stream->id, pushes->nelts);
+                      H2_STREAM_MSG(stream, "found %d push candidates"),
+                      pushes->nelts);
         for (i = 0; i < pushes->nelts; ++i) {
             h2_push *push = APR_ARRAY_IDX(pushes, i, h2_push*);
             h2_stream *s = h2_session_push(stream->session, stream, push);
@@ -721,29 +878,6 @@ const h2_priority *h2_stream_get_priorit
     return NULL;
 }
 
-const char *h2_stream_state_str(h2_stream *stream)
-{
-    switch (stream->state) {
-        case H2_STREAM_ST_IDLE:
-            return "IDLE";
-        case H2_STREAM_ST_OPEN:
-            return "OPEN";
-        case H2_STREAM_ST_RESV_LOCAL:
-            return "RESERVED_LOCAL";
-        case H2_STREAM_ST_RESV_REMOTE:
-            return "RESERVED_REMOTE";
-        case H2_STREAM_ST_CLOSED_INPUT:
-            return "HALF_CLOSED_REMOTE";
-        case H2_STREAM_ST_CLOSED_OUTPUT:
-            return "HALF_CLOSED_LOCAL";
-        case H2_STREAM_ST_CLOSED:
-            return "CLOSED";
-        default:
-            return "UNKNOWN";
-            
-    }
-}
-
 int h2_stream_is_ready(h2_stream *stream)
 {
     if (stream->has_response) {
@@ -755,4 +889,15 @@ int h2_stream_is_ready(h2_stream *stream
     return 0;
 }
 
+int h2_stream_was_closed(const h2_stream *stream)
+{
+    switch (stream->state) {
+        case H2_SS_CLOSED:
+        case H2_SS_CLEANUP:
+            return 1;
+        default:
+            return 0;
+    }
+}
+
 

Modified: httpd/httpd/trunk/modules/http2/h2_stream.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_stream.h?rev=1782875&r1=1782874&r2=1782875&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_stream.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_stream.h Mon Feb 13 21:00:30 2017
@@ -36,19 +36,33 @@ struct h2_priority;
 struct h2_request;
 struct h2_headers;
 struct h2_session;
-struct h2_sos;
+struct h2_task;
 struct h2_bucket_beam;
 
 typedef struct h2_stream h2_stream;
 
+typedef void h2_stream_state_cb(void *ctx, h2_stream *stream);
+typedef void h2_stream_event_cb(void *ctx, h2_stream *stream, 
+                                h2_stream_event_t ev);
+
+typedef struct h2_stream_monitor {
+    void *ctx;
+    h2_stream_state_cb *on_state_enter;   /* called when a state is entered */
+    h2_stream_state_cb *on_state_invalid; /* called when an invalid state change
+                                             was detected */
+    h2_stream_event_cb *on_state_event;   /* called right before the given event
+                                             result in a new stream state */
+} h2_stream_monitor;
+
 struct h2_stream {
-    int id;            /* http2 stream id */
-    int initiated_on;  /* initiating stream id (PUSH) or 0 */
-    apr_time_t created;         /* when stream was created */
-    h2_stream_state_t state;    /* http/2 state of this stream */
+    int id;                     /* http2 stream identifier */
+    int initiated_on;           /* initiating stream id (PUSH) or 0 */
+    apr_pool_t *pool;           /* the memory pool for this stream */
     struct h2_session *session; /* the session this stream belongs to */
+    h2_stream_state_t state;    /* state of this stream */
+    
+    apr_time_t created;         /* when stream was created */
     
-    apr_pool_t *pool;           /* the memory pool for this stream */
     const struct h2_request *request; /* the request made in this stream */
     struct h2_request *rtmp;    /* request being assembled */
     apr_table_t *trailers;      /* optional incoming trailers */
@@ -61,41 +75,52 @@ struct h2_stream {
     int rst_error;              /* stream error for RST_STREAM */
     unsigned int aborted   : 1; /* was aborted */
     unsigned int scheduled : 1; /* stream has been scheduled */
-    unsigned int started   : 1; /* stream has started processing */
     unsigned int has_response : 1; /* response headers are known */
     unsigned int push_policy;   /* which push policy to use for this request */
     
+    struct h2_task *task;       /* assigned task to fullfill request */
+    
     const h2_priority *pref_priority; /* preferred priority for this stream */
     apr_off_t out_data_frames;  /* # of DATA frames sent */
     apr_off_t out_data_octets;  /* # of DATA octets (payload) sent */
     apr_off_t in_data_frames;   /* # of DATA frames received */
     apr_off_t in_data_octets;   /* # of DATA octets (payload) received */
     
-    const char  *sos_filter;
+    h2_stream_monitor *monitor; /* optional monitor for stream states */
 };
 
 
 #define H2_STREAM_RST(s, def)    (s->rst_error? s->rst_error : (def))
 
 /**
- * Create a stream in OPEN state.
+ * Create a stream in H2_SS_IDLE state.
  * @param id      the stream identifier
  * @param pool    the memory pool to use for this stream
  * @param session the session this stream belongs to
  * @return the newly opened stream
  */
-h2_stream *h2_stream_open(int id, apr_pool_t *pool, struct h2_session *session,
-                          int initiated_on);
+h2_stream *h2_stream_create(int id, apr_pool_t *pool, 
+                            struct h2_session *session,
+                            h2_stream_monitor *monitor,
+                            int initiated_on);
 
 /**
- * Cleanup any resources still held by the stream, called by last bucket.
+ * Destroy memory pool if still owned by the stream.
  */
-void h2_stream_eos_destroy(h2_stream *stream);
+void h2_stream_destroy(h2_stream *stream);
+
+/*
+ * Set a new monitor for this stream, replacing any existing one. Can
+ * be called with NULL to have no monitor installed.
+ */
+void h2_stream_set_monitor(h2_stream *stream, h2_stream_monitor *monitor);
 
 /**
- * Destroy memory pool if still owned by the stream.
+ * Dispatch (handle) an event on the given stream.
+ * @param stream  the streama the event happened on
+ * @param ev      the type of event
  */
-void h2_stream_destroy(h2_stream *stream);
+void h2_stream_dispatch(h2_stream *stream, h2_stream_event_t ev);
 
 /**
  * Cleanup references into requst processing.
@@ -114,20 +139,23 @@ void h2_stream_cleanup(h2_stream *stream
 apr_pool_t *h2_stream_detach_pool(h2_stream *stream);
 
 /**
- * Initialize stream->request with the given h2_request.
+ * Set complete stream headers from given h2_request.
  * 
  * @param stream stream to write request to
  * @param r the request with all the meta data
+ * @param eos != 0 iff stream input is closed
  */
-apr_status_t h2_stream_set_request(h2_stream *stream, const h2_request *r);
+void h2_stream_set_request(h2_stream *stream, const h2_request *r);
 
 /**
- * Initialize stream->request with the given request_rec.
+ * Set complete stream header from given request_rec.
  * 
  * @param stream stream to write request to
  * @param r the request with all the meta data
+ * @param eos != 0 iff stream input is closed
  */
-apr_status_t h2_stream_set_request_rec(h2_stream *stream, request_rec *r);
+apr_status_t h2_stream_set_request_rec(h2_stream *stream, 
+                                       request_rec *r, int eos);
 
 /*
  * Add a HTTP/2 header (including pseudo headers) or trailer 
@@ -143,22 +171,19 @@ apr_status_t h2_stream_add_header(h2_str
                                   const char *name, size_t nlen,
                                   const char *value, size_t vlen);
 
-/**
- * Closes the stream's input.
- *
- * @param stream stream to close intput of
- */
-apr_status_t h2_stream_close_input(h2_stream *stream);
+apr_status_t h2_stream_send_frame(h2_stream *stream, int frame_type, int flags);
+apr_status_t h2_stream_recv_frame(h2_stream *stream, int frame_type, int flags);
 
 /*
- * Write a chunk of DATA to the stream.
+ * Process a frame of received DATA.
  *
  * @param stream stream to write the data to
+ * @param flags the frame flags
  * @param data the beginning of the bytes to write
  * @param len the number of bytes to write
  */
-apr_status_t h2_stream_write_data(h2_stream *stream,
-                                  const char *data, size_t len, int eos);
+apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags,
+                                 const uint8_t *data, size_t len);
 
 /**
  * Reset the stream. Stream write/reads will return errors afterwards.
@@ -169,29 +194,14 @@ apr_status_t h2_stream_write_data(h2_str
 void h2_stream_rst(h2_stream *stream, int error_code);
 
 /**
- * Schedule the stream for execution. All header information must be
- * present. Use the given priority comparison callback to determine 
- * order in queued streams.
- * 
- * @param stream the stream to schedule
- * @param eos    != 0 iff no more input will arrive
- * @param cmp    priority comparison
- * @param ctx    context for comparison
- */
-apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled,
-                                h2_stream_pri_cmp *cmp, void *ctx);
-
-/**
- * Determine if stream has been scheduled already.
+ * Determine if stream was closed already. This is true for
+ * states H2_SS_CLOSED, H2_SS_CLEANUP. But not true
+ * for H2_SS_CLOSED_L and H2_SS_CLOSED_R.
+ *
  * @param stream the stream to check on
- * @return != 0 iff stream has been scheduled
- */
-int h2_stream_is_scheduled(const h2_stream *stream);
-
-/**
- * Set the HTTP error status as response.
+ * @return != 0 iff stream has been closed
  */
-apr_status_t h2_stream_set_error(h2_stream *stream, int http_status);
+int h2_stream_was_closed(const h2_stream *stream);
 
 /**
  * Do a speculative read on the stream output to determine the 
@@ -236,13 +246,6 @@ apr_status_t h2_stream_read_to(h2_stream
 apr_table_t *h2_stream_get_trailers(h2_stream *stream);
 
 /**
- * Check if the stream has open input.
- * @param stream the stream to check
- * @return != 0 iff stream has open input.
- */
-int h2_stream_input_is_open(const h2_stream *stream);
-
-/**
  * Submit any server push promises on this stream and schedule
  * the tasks connection with these.
  *
@@ -268,4 +271,9 @@ const char *h2_stream_state_str(h2_strea
  */
 int h2_stream_is_ready(h2_stream *stream);
 
+
+#define H2_STREAM_MSG(s, msg)     \
+    "h2_stream(%ld-%d,%s): "msg, s->session->id, s->id, h2_stream_state_str(s)
+
+
 #endif /* defined(__mod_h2__h2_stream__) */

Modified: httpd/httpd/trunk/modules/http2/h2_task.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task.c?rev=1782875&r1=1782874&r2=1782875&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_task.c Mon Feb 13 21:00:30 2017
@@ -64,24 +64,6 @@ static void H2_TASK_OUT_LOG(int lvl, h2_
     }
 }
 
-static void h2_beam_log(h2_bucket_beam *beam, int id, const char *msg, 
-                        conn_rec *c, int level)
-{
-    if (beam && APLOG_C_IS_LEVEL(c,level)) {
-        char buffer[2048];
-        apr_size_t off = 0;
-        
-        off += apr_snprintf(buffer+off, H2_ALEN(buffer)-off, "cl=%d, ", beam->closed);
-        off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "red", ", ", &beam->send_list);
-        off += h2_util_bb_print(buffer+off, H2_ALEN(buffer)-off, "green", ", ", beam->recv_buffer);
-        off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "hold", ", ", &beam->hold_list);
-        off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "purge", "", &beam->purge_list);
-
-        ap_log_cerror(APLOG_MARK, level, 0, c, "beam(%ld-%d): %s %s", 
-                      c->id, id, msg, buffer);
-    }
-}
-
 /*******************************************************************************
  * task input handling
  ******************************************************************************/
@@ -115,11 +97,11 @@ static apr_status_t send_out(h2_task *ta
 
     apr_brigade_length(bb, 0, &written);
     H2_TASK_OUT_LOG(APLOG_TRACE2, task, bb, "h2_task send_out");
-    h2_beam_log(task->output.beam, task->stream_id, "send_out(before)", task->c, APLOG_TRACE2);
+    h2_beam_log(task->output.beam, task->c, APLOG_TRACE2, "send_out(before)");
     /* engines send unblocking */
     status = h2_beam_send(task->output.beam, bb, 
                           block? APR_BLOCK_READ : APR_NONBLOCK_READ);
-    h2_beam_log(task->output.beam, task->stream_id, "send_out(after)", task->c, APLOG_TRACE2);
+    h2_beam_log(task->output.beam, task->c, APLOG_TRACE2, "send_out(after)");
     
     if (APR_STATUS_IS_EAGAIN(status)) {
         apr_brigade_length(bb, 0, &left);
@@ -423,7 +405,7 @@ void h2_task_redo(h2_task *task)
 void h2_task_rst(h2_task *task, int error)
 {
     task->rst_error = error;
-    h2_beam_abort(task->input.beam);
+    h2_beam_leave(task->input.beam);
     if (!task->worker_done) {
         h2_beam_abort(task->output.beam);
     }
@@ -642,7 +624,7 @@ static apr_status_t h2_task_process_requ
         }
         
         /* After the call to ap_process_request, the
-         * request pool will have been deleted.  We set
+         * request pool may have been deleted.  We set
          * r=NULL here to ensure that any dereference
          * of r that might be added later in this function
          * will result in a segfault immediately instead



Mime
View raw message