httpd-cvs mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ic...@apache.org
Subject svn commit: r1782980 [3/4] - in /httpd/httpd/branches/2.4.x: ./ modules/http2/
Date Tue, 14 Feb 2017 15:53:50 GMT
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_session.c?rev=1782980&r1=1782979&r2=1782980&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_session.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_session.c Tue Feb 14 15:53:50 2017
@@ -48,6 +48,11 @@
 
 static apr_status_t dispatch_master(h2_session *session);
 static apr_status_t h2_session_read(h2_session *session, int block);
+static void transit(h2_session *session, const char *action, 
+                    h2_session_state nstate);
+
+static void on_stream_state_enter(void *ctx, h2_stream *stream);
+static void on_stream_state_event(void *ctx, h2_stream *stream, h2_stream_event_t ev);
 
 static int h2_session_status_from_apr_status(apr_status_t rv)
 {
@@ -68,7 +73,7 @@ static void update_window(void *ctx, int
     h2_session *session = (h2_session*)ctx;
     nghttp2_session_consume(session->ngh2, stream_id, bytes_read);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
-                  "h2_session(%ld-%d): consumed %ld bytes",
+                  "h2_stream(%ld-%d): consumed %ld bytes",
                   session->id, stream_id, (long)bytes_read);
 }
 
@@ -79,58 +84,30 @@ static apr_status_t h2_session_receive(v
 static void dispatch_event(h2_session *session, h2_session_event_t ev, 
                              int err, const char *msg);
 
-apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
-{
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
-                  "h2_stream(%ld-%d): EOS bucket cleanup -> done", 
-                  session->id, stream->id);
-    h2_mplx_stream_done(session->mplx, stream);
-    
-    dispatch_event(session, H2_SESSION_EV_STREAM_DONE, 0, NULL);
-    return APR_SUCCESS;
-}
-
-typedef struct stream_sel_ctx {
-    h2_session *session;
-    h2_stream *candidate;
-} stream_sel_ctx;
-
-static int find_unprocessed_stream(h2_stream *stream, void *ictx)
+static int rst_unprocessed_stream(h2_stream *stream, void *ctx)
 {
-    stream_sel_ctx *ctx = ictx;
-    if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
-        if (!ctx->session->local.accepting
-            && stream->id > ctx->session->local.accepted_max) {
-            ctx->candidate = stream;
-            return 0;
-        }
-    }
-    else {
-        if (!ctx->session->remote.accepting
-            && stream->id > ctx->session->remote.accepted_max) {
-            ctx->candidate = stream;
-            return 0;
-        }
+    int unprocessed = (!h2_stream_was_closed(stream)
+                       && (H2_STREAM_CLIENT_INITIATED(stream->id)? 
+                           (!stream->session->local.accepting
+                            && stream->id > stream->session->local.accepted_max)
+                            : 
+                           (!stream->session->remote.accepting
+                            && stream->id > stream->session->remote.accepted_max))
+                       ); 
+    if (unprocessed) {
+        h2_stream_rst(stream, H2_ERR_NO_ERROR);
+        return 0;
     }
     return 1;
 }
 
 static void cleanup_unprocessed_streams(h2_session *session)
 {
-    stream_sel_ctx ctx;
-    ctx.session = session;
-    while (1) {
-        ctx.candidate = NULL;
-        h2_mplx_stream_do(session->mplx, find_unprocessed_stream, &ctx);
-        if (!ctx.candidate) {
-            break;
-        }
-        h2_session_stream_done(session, ctx.candidate);
-    }
+    h2_mplx_stream_do(session->mplx, rst_unprocessed_stream, session);
 }
 
-h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
-                                  int initiated_on, const h2_request *req)
+static h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
+                                         int initiated_on)
 {
     h2_stream * stream;
     apr_pool_t *stream_pool;
@@ -138,29 +115,11 @@ h2_stream *h2_session_open_stream(h2_ses
     apr_pool_create(&stream_pool, session->pool);
     apr_pool_tag(stream_pool, "h2_stream");
     
-    stream = h2_stream_open(stream_id, stream_pool, session, 
-                            initiated_on);
-    nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
-    
-    if (req) {
-        h2_stream_set_request(stream, req);
-    }
-    
-    if (H2_STREAM_CLIENT_INITIATED(stream_id)) {
-        if (stream_id > session->remote.emitted_max) {
-            ++session->remote.emitted_count;
-            session->remote.emitted_max = stream->id;
-            session->local.accepted_max = stream->id;
-        }
-    }
-    else {
-        if (stream_id > session->local.emitted_max) {
-            ++session->local.emitted_count;
-            session->remote.emitted_max = stream->id;
-        }
+    stream = h2_stream_create(stream_id, stream_pool, session, 
+                              session->monitor, initiated_on);
+    if (stream) {
+        nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
     }
-    dispatch_event(session, H2_SESSION_EV_STREAM_OPEN, 0, NULL);
-    
     return stream;
 }
 
@@ -217,14 +176,6 @@ static int stream_pri_cmp(int sid1, int
     return spri_cmp(sid1, s1, sid2, s2, session);
 }
 
-static apr_status_t stream_schedule(h2_session *session,
-                                    h2_stream *stream, int eos)
-{
-    (void)session;
-    return h2_stream_schedule(stream, eos, h2_session_push_enabled(session), 
-                              stream_pri_cmp, session);
-}
-
 /*
  * Callback when nghttp2 wants to send bytes back to the client.
  */
@@ -234,9 +185,9 @@ static ssize_t send_cb(nghttp2_session *
 {
     h2_session *session = (h2_session *)userp;
     apr_status_t status;
-    
     (void)ngh2;
     (void)flags;
+    
     status = h2_conn_io_write(&session->io, (const char *)data, length);
     if (status == APR_SUCCESS) {
         return length;
@@ -260,9 +211,10 @@ static int on_invalid_frame_recv_cb(nght
         char buffer[256];
         
         h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03063)
-                      "h2_session(%ld): recv invalid FRAME[%s], frames=%ld/%ld (r/s)",
-                      session->id, buffer, (long)session->frames_received,
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                      H2_SSSN_LOG(APLOGNO(03063), session, 
+                      "recv invalid FRAME[%s], frames=%ld/%ld (r/s)"),
+                      buffer, (long)session->frames_received,
                      (long)session->frames_sent);
     }
     return 0;
@@ -278,76 +230,26 @@ static int on_data_chunk_recv_cb(nghttp2
                                  const uint8_t *data, size_t len, void *userp)
 {
     h2_session *session = (h2_session *)userp;
-    apr_status_t status = APR_SUCCESS;
+    apr_status_t status = APR_EINVAL;
     h2_stream * stream;
-    int rv;
+    int rv = 0;
     
-    (void)flags;
     stream = get_stream(session, stream_id);
-    if (!stream) {
+    if (stream) {
+        status = h2_stream_recv_DATA(stream, flags, data, len);
+    }
+    else {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03064)
                       "h2_stream(%ld-%d): on_data_chunk for unknown stream",
                       session->id, (int)stream_id);
-        rv = nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream_id,
-                                       NGHTTP2_INTERNAL_ERROR);
-        if (nghttp2_is_fatal(rv)) {
-            return NGHTTP2_ERR_CALLBACK_FAILURE;
-        }
-        return 0;
-    }
-
-    /* FIXME: enabling setting EOS this way seems to break input handling
-     * in mod_proxy_http2. why? */
-    status = h2_stream_write_data(stream, (const char *)data, len,
-                                  0 /*flags & NGHTTP2_FLAG_END_STREAM*/);
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
-                  "h2_stream(%ld-%d): data_chunk_recv, written %ld bytes",
-                  session->id, stream_id, (long)len);
-    if (status != APR_SUCCESS) {
-        update_window(session, stream_id, len);
-        rv = nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream_id,
-                                       H2_STREAM_RST(stream, H2_ERR_INTERNAL_ERROR));
-        if (nghttp2_is_fatal(rv)) {
-            return NGHTTP2_ERR_CALLBACK_FAILURE;
-        }
+        rv = NGHTTP2_ERR_CALLBACK_FAILURE;
     }
-    return 0;
-}
-
-static apr_status_t stream_closed(h2_session *session, 
-                                  h2_stream *stream,
-                                  uint32_t error_code) 
-{
-    conn_rec *c = session->c;
-    apr_bucket *b;
-    apr_status_t status;
     
-    if (!error_code) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
-                      "h2_stream(%ld-%d): handled, closing", 
-                      session->id, (int)stream->id);
-        if (H2_STREAM_CLIENT_INITIATED(stream->id)
-            && stream->id > session->local.completed_max) {
-            session->local.completed_max = stream->id;
-        }
-    }
-    else {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03065)
-                      "h2_stream(%ld-%d): closing with err=%d %s", 
-                      session->id, (int)stream->id, (int)error_code,
-                      h2_h2_err_description(error_code));
-        h2_stream_rst(stream, error_code);
+    if (status != APR_SUCCESS) {
+        /* count this as consumed explicitly as no one will read it */
+        nghttp2_session_consume(session->ngh2, stream_id, len);
     }
-    /* The stream might have data in the buffers of the main connection.
-     * We can only free the allocated resources once all had been written.
-     * Send a special buckets on the connection that gets destroyed when
-     * all preceding data has been handled. On its destruction, it is safe
-     * to purge all resources of the stream. */
-    b = h2_bucket_eos_create(c->bucket_alloc, stream);
-    APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
-    status = h2_conn_io_pass(&session->io, session->bbtmp);
-    apr_brigade_cleanup(session->bbtmp);
-    return status;
+    return rv;
 }
 
 static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
@@ -359,7 +261,13 @@ static int on_stream_close_cb(nghttp2_se
     (void)ngh2;
     stream = get_stream(session, stream_id);
     if (stream) {
-        stream_closed(session, stream, error_code);
+        if (error_code) {
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+                          H2_STRM_LOG(APLOGNO(03065), stream, 
+                          "closing with err=%d %s"), 
+                          (int)error_code, h2_h2_err_description(error_code));
+            h2_stream_rst(stream, error_code);
+        }
     }
     return 0;
 }
@@ -378,7 +286,7 @@ static int on_begin_headers_cb(nghttp2_s
         /* nop */
     }
     else {
-        s = h2_session_open_stream(userp, frame->hd.stream_id, 0, NULL);
+        s = h2_session_open_stream(userp, frame->hd.stream_id, 0);
     }
     return s? 0 : NGHTTP2_ERR_START_STREAM_NOT_ALLOWED;
 }
@@ -396,23 +304,15 @@ static int on_header_cb(nghttp2_session
     (void)flags;
     stream = get_stream(session, frame->hd.stream_id);
     if (!stream) {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
-                      APLOGNO(02920) 
-                      "h2_session:  stream(%ld-%d): on_header unknown stream",
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(02920) 
+                      "h2_stream(%ld-%d): on_header unknown stream",
                       session->id, (int)frame->hd.stream_id);
         return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
     }
     
     status = h2_stream_add_header(stream, (const char *)name, namelen,
                                   (const char *)value, valuelen);
-    if (status == APR_ECONNRESET) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
-                      "h2-stream(%ld-%d): on_header, reset stream",
-                      session->id, stream->id);
-        nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream->id,
-                                  NGHTTP2_INTERNAL_ERROR);
-    }
-    else if (status != APR_SUCCESS && !h2_stream_is_ready(stream)) {
+    if (status != APR_SUCCESS && !h2_stream_is_ready(stream)) {
         return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
     }
     return 0;
@@ -428,16 +328,16 @@ static int on_frame_recv_cb(nghttp2_sess
                             void *userp)
 {
     h2_session *session = (h2_session *)userp;
-    apr_status_t status = APR_SUCCESS;
     h2_stream *stream;
     
     if (APLOGcdebug(session->c)) {
         char buffer[256];
         
         h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03066)
-                      "h2_session(%ld): recv FRAME[%s], frames=%ld/%ld (r/s)",
-                      session->id, buffer, (long)session->frames_received,
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                      H2_SSSN_LOG(APLOGNO(03066), session, 
+                      "recv FRAME[%s], frames=%ld/%ld (r/s)"),
+                      buffer, (long)session->frames_received,
                      (long)session->frames_sent);
     }
 
@@ -449,47 +349,23 @@ static int on_frame_recv_cb(nghttp2_sess
              * trailers */
             stream = get_stream(session, frame->hd.stream_id);
             if (stream) {
-                int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
-                
-                if (h2_stream_is_scheduled(stream)) {
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
-                                  "h2_stream(%ld-%d): TRAILER, eos=%d", 
-                                  session->id, frame->hd.stream_id, eos);
-                    if (eos) {
-                        status = h2_stream_close_input(stream);
-                    }
-                }
-                else {
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
-                                  "h2_stream(%ld-%d): HEADER, eos=%d", 
-                                  session->id, frame->hd.stream_id, eos);
-                    status = stream_schedule(session, stream, eos);
-                }
-            }
-            else {
-                status = APR_EINVAL;
+                h2_stream_recv_frame(stream, NGHTTP2_HEADERS, frame->hd.flags);
             }
             break;
         case NGHTTP2_DATA:
             stream = get_stream(session, frame->hd.stream_id);
             if (stream) {
-                int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
-                              "h2_stream(%ld-%d): DATA, len=%ld, eos=%d", 
-                              session->id, frame->hd.stream_id, 
-                              (long)frame->hd.length, eos);
-                if (eos) {
-                    status = h2_stream_close_input(stream);
-                }
-            }
-            else {
-                status = APR_EINVAL;
+                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,  
+                              H2_STRM_LOG(APLOGNO(02923), stream, 
+                              "DATA, len=%ld, flags=%d"), 
+                              (long)frame->hd.length, frame->hd.flags);
+                h2_stream_recv_frame(stream, NGHTTP2_DATA, frame->hd.flags);
             }
             break;
         case NGHTTP2_PRIORITY:
             session->reprioritize = 1;
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
-                          "h2_session:  stream(%ld-%d): PRIORITY frame "
+                          "h2_stream(%ld-%d): PRIORITY frame "
                           " weight=%d, dependsOn=%d, exclusive=%d", 
                           session->id, (int)frame->hd.stream_id,
                           frame->priority.pri_spec.weight,
@@ -498,14 +374,13 @@ static int on_frame_recv_cb(nghttp2_sess
             break;
         case NGHTTP2_WINDOW_UPDATE:
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
-                          "h2_session:  stream(%ld-%d): WINDOW_UPDATE "
-                          "incr=%d", 
+                          "h2_stream(%ld-%d): WINDOW_UPDATE incr=%d", 
                           session->id, (int)frame->hd.stream_id,
                           frame->window_update.window_size_increment);
             break;
         case NGHTTP2_RST_STREAM:
             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03067)
-                          "h2_session(%ld-%d): RST_STREAM by client, errror=%d",
+                          "h2_stream(%ld-%d): RST_STREAM by client, errror=%d",
                           session->id, (int)frame->hd.stream_id,
                           (int)frame->rst_stream.error_code);
             stream = get_stream(session, frame->hd.stream_id);
@@ -535,26 +410,10 @@ static int on_frame_recv_cb(nghttp2_sess
                 h2_util_frame_print(frame, buffer,
                                     sizeof(buffer)/sizeof(buffer[0]));
                 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
-                              "h2_session: on_frame_rcv %s", buffer);
+                              H2_SSSN_MSG(session, "on_frame_rcv %s"), buffer);
             }
             break;
     }
-
-    if (status != APR_SUCCESS) {
-        int rv;
-        
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
-                      APLOGNO(02923) 
-                      "h2_session: stream(%ld-%d): error handling frame",
-                      session->id, (int)frame->hd.stream_id);
-        rv = nghttp2_submit_rst_stream(ng2s, NGHTTP2_FLAG_NONE,
-                                       frame->hd.stream_id,
-                                       NGHTTP2_INTERNAL_ERROR);
-        if (nghttp2_is_fatal(rv)) {
-            return NGHTTP2_ERR_CALLBACK_FAILURE;
-        }
-    }
-    
     return 0;
 }
 
@@ -597,19 +456,19 @@ static int on_send_data_cb(nghttp2_sessi
     }
     padlen = (unsigned char)frame->data.padlen;
     
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
-                  "h2_stream(%ld-%d): send_data_cb for %ld bytes",
-                  session->id, (int)stream_id, (long)length);
-                  
     stream = get_stream(session, stream_id);
     if (!stream) {
         ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_NOTFOUND, session->c,
                       APLOGNO(02924) 
-                      "h2_stream(%ld-%d): send_data, lookup stream",
+                      "h2_stream(%ld-%d): send_data, stream not found",
                       session->id, (int)stream_id);
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
     
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+                  H2_STRM_MSG(stream, "send_data_cb for %ld bytes"),
+                  (long)length);
+                  
     status = h2_conn_io_write(&session->io, (const char *)framehd, 9);
     if (padlen && status == APR_SUCCESS) {
         status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
@@ -617,24 +476,21 @@ static int on_send_data_cb(nghttp2_sessi
     
     if (status != APR_SUCCESS) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
-                      "h2_stream(%ld-%d): writing frame header",
-                      session->id, (int)stream_id);
+                      H2_STRM_MSG(stream, "writing frame header"));
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
     
     status = h2_stream_read_to(stream, session->bbtmp, &len, &eos);
     if (status != APR_SUCCESS) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
-                      "h2_stream(%ld-%d): send_data_cb, reading stream",
-                      session->id, (int)stream_id);
+                      H2_STRM_MSG(stream, "send_data_cb, reading stream"));
         apr_brigade_cleanup(session->bbtmp);
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
     else if (len != length) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
-                      "h2_stream(%ld-%d): send_data_cb, wanted %ld bytes, "
-                      "got %ld from stream",
-                      session->id, (int)stream_id, (long)length, (long)len);
+                      H2_STRM_MSG(stream, "send_data_cb, wanted %ld bytes, "
+                      "got %ld from stream"), (long)length, (long)len);
         apr_brigade_cleanup(session->bbtmp);
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
@@ -654,10 +510,8 @@ static int on_send_data_cb(nghttp2_sessi
         return 0;
     }
     else {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
-                      APLOGNO(02925) 
-                      "h2_stream(%ld-%d): failed send_data_cb",
-                      session->id, (int)stream_id);
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,  
+                      H2_STRM_LOG(APLOGNO(02925), stream, "failed send_data_cb"));
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
 }
@@ -667,16 +521,34 @@ static int on_frame_send_cb(nghttp2_sess
                             void *user_data)
 {
     h2_session *session = user_data;
+    h2_stream *stream;
+    int stream_id = frame->hd.stream_id;
+    
+    ++session->frames_sent;
+    switch (frame->hd.type) {
+        case NGHTTP2_PUSH_PROMISE:
+            /* PUSH_PROMISE we report on the promised stream */
+            stream_id = frame->push_promise.promised_stream_id;
+            break;
+        default:    
+            break;
+    }
+    
     if (APLOGcdebug(session->c)) {
         char buffer[256];
         
         h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03068)
-                      "h2_session(%ld): sent FRAME[%s], frames=%ld/%ld (r/s)",
-                      session->id, buffer, (long)session->frames_received,
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                      H2_SSSN_LOG(APLOGNO(03068), session, 
+                      "sent FRAME[%s], frames=%ld/%ld (r/s)"),
+                      buffer, (long)session->frames_received,
                      (long)session->frames_sent);
     }
-    ++session->frames_sent;
+    
+    stream = get_stream(session, stream_id);
+    if (stream) {
+        h2_stream_send_frame(stream, frame->hd.type, frame->hd.flags);
+    }
     return 0;
 }
 
@@ -688,16 +560,20 @@ static int on_invalid_header_cb(nghttp2_
                                 uint8_t flags, void *user_data)
 {
     h2_session *session = user_data;
+    h2_stream *stream;
+    
     if (APLOGcdebug(session->c)) {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03456)
-                      "h2_session(%ld-%d): denying stream with invalid header "
-                      "'%s: %s'", session->id, (int)frame->hd.stream_id,
+                      "h2_stream(%ld-%d): invalid header '%s: %s'", 
+                      session->id, (int)frame->hd.stream_id,
                       apr_pstrndup(session->pool, (const char *)name, namelen),
                       apr_pstrndup(session->pool, (const char *)value, valuelen));
     }
-    return nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
-                                     frame->hd.stream_id, 
-                                     NGHTTP2_PROTOCOL_ERROR);
+    stream = get_stream(session, frame->hd.stream_id);
+    if (stream) {
+        h2_stream_rst(stream, NGHTTP2_PROTOCOL_ERROR);
+    }
+    return 0;
 }
 #endif
 
@@ -744,8 +620,8 @@ static apr_status_t h2_session_shutdown_
     if (status == APR_SUCCESS) {
         status = h2_conn_io_flush(&session->io);
     }
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03457)
-                  "session(%ld): sent shutdown notice", session->id);
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                  H2_SSSN_LOG(APLOGNO(03457), session, "sent shutdown notice"));
     return status;
 }
 
@@ -787,16 +663,10 @@ static apr_status_t h2_session_shutdown(
     if (status == APR_SUCCESS) {
         status = h2_conn_io_flush(&session->io);
     }
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03069)
-                  "session(%ld): sent GOAWAY, err=%d, msg=%s", 
-                  session->id, error, msg? msg : "");
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                  H2_SSSN_LOG(APLOGNO(03069), session, 
+                  "sent GOAWAY, err=%d, msg=%s"), error, msg? msg : "");
     dispatch_event(session, H2_SESSION_EV_LOCAL_GOAWAY, error, msg);
-    
-    if (force_close) {
-        apr_brigade_cleanup(session->bbtmp);
-        h2_mplx_abort(session->mplx);
-    }
-    
     return status;
 }
 
@@ -804,7 +674,7 @@ static apr_status_t session_pool_cleanup
 {
     h2_session *session = data;
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
-                  "session(%ld): pool_cleanup", session->id);
+                  H2_SSSN_MSG(session, "pool_cleanup"));
     
     if (session->state != H2_SESSION_ST_DONE
         && session->state != H2_SESSION_ST_INIT) {
@@ -816,60 +686,22 @@ static apr_status_t session_pool_cleanup
          * connection when sending the next request, this has the effect
          * that at least this one request will fail.
          */
-        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, session->c, APLOGNO(03199)
-                      "session(%ld): connection disappeared without proper "
-                      "goodbye, clients will be confused, should not happen", 
-                      session->id);
+        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, session->c,
+                      H2_SSSN_LOG(APLOGNO(03199), session, 
+                      "connection disappeared without proper "
+                      "goodbye, clients will be confused, should not happen"));
     }
 
-    if (session->mplx) {
-        h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
-        h2_mplx_release_and_join(session->mplx, session->iowait);
-        session->mplx = NULL;
-    }
-    if (session->ngh2) {
-        nghttp2_session_del(session->ngh2);
-        session->ngh2 = NULL;
-    }
-    return APR_SUCCESS;
-}
-
-static void *session_malloc(size_t size, void *ctx)
-{
-    h2_session *session = ctx;
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, session->c,
-                  "h2_session(%ld): malloc(%ld)",
-                  session->id, (long)size);
-    return malloc(size);
-}
+    transit(session, "pool cleanup", H2_SESSION_ST_CLEANUP);
+    h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
+    h2_mplx_release_and_join(session->mplx, session->iowait);
+    session->mplx = NULL;
+
+    ap_assert(session->ngh2);
+    nghttp2_session_del(session->ngh2);
+    session->ngh2 = NULL;
 
-static void session_free(void *p, void *ctx)
-{
-    h2_session *session = ctx;
-
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, session->c,
-                  "h2_session(%ld): free()",
-                  session->id);
-    free(p);
-}
-
-static void *session_calloc(size_t n, size_t size, void *ctx)
-{
-    h2_session *session = ctx;
-
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, session->c,
-                  "h2_session(%ld): calloc(%ld, %ld)",
-                  session->id, (long)n, (long)size);
-    return calloc(n, size);
-}
-
-static void *session_realloc(void *p, size_t size, void *ctx)
-{
-    h2_session *session = ctx;
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, session->c,
-                  "h2_session(%ld): realloc(%ld)",
-                  session->id, (long)size);
-    return realloc(p, size);
+    return APR_SUCCESS;
 }
 
 static h2_session *h2_session_create_int(conn_rec *c,
@@ -891,11 +723,9 @@ static h2_session *h2_session_create_int
 
     /* get h2_session a lifetime beyond its pool and everything
      * connected to it. */
-    session = apr_pcalloc(c->pool, sizeof(h2_session));
+    session = apr_pcalloc(pool, sizeof(h2_session));
     if (session) {
         int rv;
-        nghttp2_mem *mem;
-        
         session->id = c->id;
         session->c = c;
         session->r = r;
@@ -920,6 +750,14 @@ static h2_session *h2_session_create_int
             return NULL;
         }
         
+        session->monitor = apr_pcalloc(pool, sizeof(h2_stream_monitor));
+        if (session->monitor == NULL) {
+            return NULL;
+        }
+        session->monitor->ctx = session;
+        session->monitor->on_state_enter = on_stream_state_enter;
+        session->monitor->on_state_event = on_stream_state_event;
+         
         session->mplx = h2_mplx_create(c, session->pool, session->config, 
                                        session->s->timeout, workers);
         
@@ -953,21 +791,8 @@ static h2_session *h2_session_create_int
          * get flooded by nghttp2. */
         nghttp2_option_set_no_auto_window_update(options, 1);
         
-        if (APLOGctrace6(c)) {
-            mem = apr_pcalloc(session->pool, sizeof(nghttp2_mem));
-            mem->mem_user_data = session;
-            mem->malloc    = session_malloc;
-            mem->free      = session_free;
-            mem->calloc    = session_calloc;
-            mem->realloc   = session_realloc;
-            
-            rv = nghttp2_session_server_new3(&session->ngh2, callbacks,
-                                             session, options, mem);
-        }
-        else {
-            rv = nghttp2_session_server_new2(&session->ngh2, callbacks,
-                                             session, options);
-        }
+        rv = nghttp2_session_server_new2(&session->ngh2, callbacks,
+                                         session, options);
         nghttp2_session_callbacks_del(callbacks);
         nghttp2_option_del(options);
         
@@ -982,11 +807,12 @@ static h2_session *h2_session_create_int
         session->push_diary = h2_push_diary_create(session->pool, n);
         
         if (APLOGcdebug(c)) {
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03200)
-                          "h2_session(%ld) created, max_streams=%d, "
-                          "stream_mem=%d, workers_limit=%d, workers_max=%d, "
-                          "push_diary(type=%d,N=%d)",
-                          session->id, (int)session->max_stream_count, 
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, 
+                          H2_SSSN_LOG(APLOGNO(03200), session, 
+                          "created, max_streams=%d, stream_mem=%d, "
+                          "workers_limit=%d, workers_max=%d, "
+                          "push_diary(type=%d,N=%d)"),
+                          (int)session->max_stream_count, 
                           (int)session->max_stream_mem,
                           session->mplx->workers_limit, 
                           session->mplx->workers_max, 
@@ -1052,7 +878,7 @@ static apr_status_t h2_session_start(h2_
         }
         
         /* Now we need to auto-open stream 1 for the request we got. */
-        stream = h2_session_open_stream(session, 1, 0, NULL);
+        stream = h2_session_open_stream(session, 1, 0);
         if (!stream) {
             status = APR_EGENERAL;
             ap_log_rerror(APLOG_MARK, APLOG_ERR, status, session->r,
@@ -1061,11 +887,7 @@ static apr_status_t h2_session_start(h2_
             return status;
         }
         
-        status = h2_stream_set_request_rec(stream, session->r);
-        if (status != APR_SUCCESS) {
-            return status;
-        }
-        status = stream_schedule(session, stream, 1);
+        status = h2_stream_set_request_rec(stream, session->r, 1);
         if (status != APR_SUCCESS) {
             return status;
         }
@@ -1082,17 +904,17 @@ static apr_status_t h2_session_start(h2_
         ++slen;
     }
     
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(03201)
-                  "h2_session(%ld): start, INITIAL_WINDOW_SIZE=%ld, "
-                  "MAX_CONCURRENT_STREAMS=%d", 
-                  session->id, (long)win_size, (int)session->max_stream_count);
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, 
+                  H2_SSSN_LOG(APLOGNO(03201), session, 
+                  "start, INITIAL_WINDOW_SIZE=%ld, MAX_CONCURRENT_STREAMS=%d"), 
+                  (long)win_size, (int)session->max_stream_count);
     *rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE,
                                   settings, slen);
     if (*rv != 0) {
         status = APR_EGENERAL;
         ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
-                      APLOGNO(02935) "nghttp2_submit_settings: %s", 
-                      nghttp2_strerror(*rv));
+                      H2_SSSN_LOG(APLOGNO(02935), session, 
+                      "nghttp2_submit_settings: %s"), nghttp2_strerror(*rv));
     }
     else {
         /* use maximum possible value for connection window size. We are only
@@ -1109,7 +931,8 @@ static apr_status_t h2_session_start(h2_
         if (*rv != 0) {
             status = APR_EGENERAL;
             ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
-                          APLOGNO(02970) "nghttp2_submit_window_update: %s", 
+                          H2_SSSN_LOG(APLOGNO(02970), session,
+                          "nghttp2_submit_window_update: %s"), 
                           nghttp2_strerror(*rv));        
         }
     }
@@ -1150,13 +973,16 @@ static ssize_t stream_data_cb(nghttp2_se
     if (!stream) {
         ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
                       APLOGNO(02937) 
-                      "h2_stream(%ld-%d): data requested but stream not found",
+                      "h2_stream(%ld-%d): data_cb, stream not found",
                       session->id, (int)stream_id);
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
 
     status = h2_stream_out_prepare(stream, &nread, &eos, NULL);
     if (nread) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
+                      H2_STRM_MSG(stream, "prepared no_copy, len=%ld, eos=%d"),
+                      (long)nread, eos);
         *data_flags |=  NGHTTP2_DATA_FLAG_NO_COPY;
     }
     
@@ -1165,8 +991,7 @@ static ssize_t stream_data_cb(nghttp2_se
             break;
             
         case APR_ECONNRESET:
-            return nghttp2_submit_rst_stream(ng2s, NGHTTP2_FLAG_NONE,
-                stream->id, stream->rst_error);
+            return 0;
             
         case APR_EAGAIN:
             /* If there is no data available, our session will automatically
@@ -1174,16 +999,14 @@ static ssize_t stream_data_cb(nghttp2_se
              * it. Remember at our h2_stream that we need to do this.
              */
             nread = 0;
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03071)
-                          "h2_stream(%ld-%d): suspending",
-                          session->id, (int)stream_id);
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+                          H2_STRM_LOG(APLOGNO(03071), stream, "suspending"));
             return NGHTTP2_ERR_DEFERRED;
             
         default:
             nread = 0;
-            ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
-                          APLOGNO(02938) "h2_stream(%ld-%d): reading data",
-                          session->id, (int)stream_id);
+            ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c, 
+                          H2_STRM_LOG(APLOGNO(02938), stream, "reading data"));
             return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
     
@@ -1196,7 +1019,6 @@ static ssize_t stream_data_cb(nghttp2_se
 struct h2_stream *h2_session_push(h2_session *session, h2_stream *is,
                                   h2_push *push)
 {
-    apr_status_t status;
     h2_stream *stream;
     h2_ngheader *ngh;
     int nid;
@@ -1205,42 +1027,31 @@ struct h2_stream *h2_session_push(h2_ses
     nid = nghttp2_submit_push_promise(session->ngh2, 0, is->id, 
                                       ngh->nv, ngh->nvlen, NULL);
     if (nid <= 0) {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03075)
-                      "h2_stream(%ld-%d): submitting push promise fail: %s",
-                      session->id, is->id, nghttp2_strerror(nid));
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                      H2_STRM_LOG(APLOGNO(03075), is, 
+                      "submitting push promise fail: %s"), nghttp2_strerror(nid));
         return NULL;
     }
     ++session->pushes_promised;
     
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03076)
-                  "h2_stream(%ld-%d): SERVER_PUSH %d for %s %s on %d",
-                  session->id, is->id, nid,
-                  push->req->method, push->req->path, is->id);
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                  H2_STRM_LOG(APLOGNO(03076), is, "SERVER_PUSH %d for %s %s on %d"),
+                  nid, push->req->method, push->req->path, is->id);
                   
-    stream = h2_session_open_stream(session, nid, is->id, push->req);
-    if (stream) {
-        h2_session_set_prio(session, stream, push->priority);
-        status = stream_schedule(session, stream, 1);
-        if (status != APR_SUCCESS) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
-                          "h2_stream(%ld-%d): scheduling push stream",
-                          session->id, stream->id);
-            stream = NULL;
-        }
-        ++session->unsent_promises;
-    }
-    else {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03077)
-                      "h2_stream(%ld-%d): failed to create stream obj %d",
-                      session->id, is->id, nid);
-    }
-
+    stream = h2_session_open_stream(session, nid, is->id);
     if (!stream) {
-        /* try to tell the client that it should not wait. */
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                      H2_STRM_LOG(APLOGNO(03077), stream, 
+                      "failed to create stream obj %d"), nid);
+        /* kill the push_promise */
         nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, nid,
                                   NGHTTP2_INTERNAL_ERROR);
+        return NULL;
     }
     
+    h2_session_set_prio(session, stream, push->priority);
+    h2_stream_set_request(stream, push->req);
+    ++session->unsent_promises;
     return stream;
 }
 
@@ -1265,8 +1076,7 @@ apr_status_t h2_session_set_prio(h2_sess
     s = nghttp2_session_find_stream(session->ngh2, stream->id);
     if (!s) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
-                      "h2_stream(%ld-%d): lookup of nghttp2_stream failed",
-                      session->id, stream->id);
+                      H2_STRM_MSG(stream, "lookup of nghttp2_stream failed"));
         return APR_EINVAL;
     }
     
@@ -1336,11 +1146,10 @@ apr_status_t h2_session_set_prio(h2_sess
 
 
         rv = nghttp2_session_change_stream_priority(session->ngh2, stream->id, &ps);
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03203)
-                      "h2_stream(%ld-%d): PUSH %s, weight=%d, "
-                      "depends=%d, returned=%d",
-                      session->id, stream->id, ptype, 
-                      ps.weight, ps.stream_id, rv);
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                      ""H2_STRM_LOG(APLOGNO(03203), stream, 
+                      "PUSH %s, weight=%d, depends=%d, returned=%d"),
+                      ptype, ps.weight, ps.stream_id, rv);
         status = (rv < 0)? APR_EGENERAL : APR_SUCCESS;
     }
 #else
@@ -1404,23 +1213,17 @@ static apr_status_t on_stream_headers(h2
 
     ap_assert(session);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
-                  "h2_stream(%ld-%d): on_headers", session->id, stream->id);
+                  H2_STRM_MSG(stream, "on_headers"));
     if (headers->status < 100) {
-        int err = H2_STREAM_RST(stream, headers->status);
-        rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
-                                       stream->id, err);
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
-                  "h2_stream(%ld-%d): unpexected header status %d, stream rst", 
-                  session->id, stream->id, headers->status);
+        h2_stream_rst(stream, headers->status);
         goto leave;
     }
     else if (stream->has_response) {
         h2_ngheader *nh;
         
         nh = h2_util_ngheader_make(stream->pool, headers->headers);
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03072)
-                      "h2_stream(%ld-%d): submit %d trailers",
-                      session->id, (int)stream->id,(int) nh->nvlen);
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                      H2_STRM_LOG(APLOGNO(03072), stream, "submit %d trailers"), (int)nh->nvlen);
         rv = nghttp2_submit_trailer(session->ngh2, stream->id, nh->nv, nh->nvlen);
         goto leave;
     }
@@ -1430,9 +1233,9 @@ static apr_status_t on_stream_headers(h2
         apr_table_t *hout;
         const char *note;
         
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073)
-                      "h2_stream(%ld-%d): submit response %d, REMOTE_WINDOW_SIZE=%u",
-                      session->id, stream->id, headers->status,
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                      H2_STRM_LOG(APLOGNO(03073), stream, "submit response %d, REMOTE_WINDOW_SIZE=%u"),
+                      headers->status,
                       (unsigned int)nghttp2_session_get_stream_remote_window_size(session->ngh2, stream->id));
         
         if (!eos || len > 0) {
@@ -1548,14 +1351,14 @@ static apr_status_t on_stream_resume(voi
     
     ap_assert(stream);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
-                  "h2_stream(%ld-%d): on_resume", session->id, stream->id);
+                  H2_STRM_MSG(stream, "on_resume"));
         
 send_headers:
     headers = NULL;
     status = h2_stream_out_prepare(stream, &len, &eos, &headers);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, 
-                  "h2_stream(%ld-%d): prepared len=%ld, eos=%d", 
-                  session->id, stream->id, (long)len, eos);
+                  H2_STRM_MSG(stream, "prepared len=%ld, eos=%d"), 
+                  (long)len, eos);
     if (headers) {
         status = on_stream_headers(session, stream, headers, len, eos);
         if (status != APR_SUCCESS || stream->rst_error) {
@@ -1564,22 +1367,19 @@ send_headers:
         goto send_headers;
     }
     else if (status != APR_EAGAIN) {
+        /* we have DATA to send */
         if (!stream->has_response) {
-            int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03466)
-                          "h2_stream(%ld-%d): no response, RST_STREAM, err=%d",
-                          session->id, stream->id, err);
-            nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
-                                      stream->id, err);
+            /* but no response */
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                          H2_STRM_LOG(APLOGNO(03466), stream, "no response, RST_STREAM"));
+            h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR);
             return APR_SUCCESS;
         } 
         rv = nghttp2_session_resume_data(session->ngh2, stream->id);
         session->have_written = 1;
         ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)?
-                      APLOG_ERR : APLOG_DEBUG, 0, session->c,
-                      APLOGNO(02936) 
-                      "h2_stream(%ld-%d): resuming %s",
-                      session->id, stream->id, rv? nghttp2_strerror(rv) : "");
+                      APLOG_ERR : APLOG_DEBUG, 0, session->c,  
+                      H2_STRM_LOG(APLOGNO(02936), stream, "resumed"));
     }
     return status;
 }
@@ -1592,8 +1392,8 @@ static apr_status_t h2_session_receive(v
     
     if (len > 0) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
-                      "h2_session(%ld): feeding %ld bytes to nghttp2",
-                      session->id, (long)len);
+                      H2_SSSN_MSG(session, "feeding %ld bytes to nghttp2"),
+                      (long)len);
         n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)data, len);
         if (n < 0) {
             if (nghttp2_is_fatal((int)n)) {
@@ -1648,15 +1448,14 @@ static apr_status_t h2_session_read(h2_s
                         || APR_STATUS_IS_EOF(status)
                         || APR_STATUS_IS_EBADF(status)) {
                         /* common status for a client that has left */
-                        ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
-                                      "h2_session(%ld): input gone", session->id);
+                        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
+                                      H2_SSSN_MSG(session, "input gone"));
                     }
                     else {
                         /* uncommon status, log on INFO so that we see this */
                         ap_log_cerror( APLOG_MARK, APLOG_DEBUG, status, c,
-                                      APLOGNO(02950) 
-                                      "h2_session(%ld): error reading, terminating",
-                                      session->id);
+                                      H2_SSSN_LOG(APLOGNO(02950), session, 
+                                      "error reading, terminating"));
                     }
                     return status;
                 }
@@ -1667,7 +1466,7 @@ static apr_status_t h2_session_read(h2_s
         if ((session->io.bytes_read - read_start) > (64*1024)) {
             /* read enough in one go, give write a chance */
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
-                          "h2_session(%ld): read 64k, returning", session->id);
+                          H2_SSSN_MSG(session, "read 64k, returning"));
             break;
         }
     }
@@ -1680,9 +1479,10 @@ static const char *StateNames[] = {
     "IDLE",      /* H2_SESSION_ST_IDLE */
     "BUSY",      /* H2_SESSION_ST_BUSY */
     "WAIT",      /* H2_SESSION_ST_WAIT */
+    "CLEANUP",   /* H2_SESSION_ST_CLEANUP */
 };
 
-static const char *state_name(h2_session_state state)
+const char *h2_session_state_str(h2_session_state state)
 {
     if (state >= (sizeof(StateNames)/sizeof(StateNames[0]))) {
         return "unknown";
@@ -1716,9 +1516,11 @@ static void transit(h2_session *session,
             || (session->state == H2_SESSION_ST_WAIT && nstate == H2_SESSION_ST_BUSY)){
             loglvl = APLOG_TRACE1;
         }
-        ap_log_cerror(APLOG_MARK, loglvl, 0, session->c, APLOGNO(03078)
-                      "h2_session(%ld): transit [%s] -- %s --> [%s]", session->id,
-                      state_name(session->state), action, state_name(nstate));
+        ap_log_cerror(APLOG_MARK, loglvl, 0, session->c, 
+                      H2_SSSN_LOG(APLOGNO(03078), session, 
+                      "transit [%s] -- %s --> [%s]"), 
+                      h2_session_state_str(session->state), action, 
+                      h2_session_state_str(nstate));
         session->state = nstate;
         switch (session->state) {
             case H2_SESSION_ST_IDLE:
@@ -1779,8 +1581,9 @@ static void h2_session_ev_conn_error(h2_
             break;
         
         default:
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03401)
-                          "h2_session(%ld): conn error -> shutdown", session->id);
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                          H2_SSSN_LOG(APLOGNO(03401), session, 
+                          "conn error -> shutdown"));
             h2_session_shutdown(session, arg, msg, 0);
             break;
     }
@@ -1789,8 +1592,9 @@ static void h2_session_ev_conn_error(h2_
 static void h2_session_ev_proto_error(h2_session *session, int arg, const char *msg)
 {
     if (!session->local.shutdown) {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03402)
-                      "h2_session(%ld): proto error -> shutdown", session->id);
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                      H2_SSSN_LOG(APLOGNO(03402), session, 
+                      "proto error -> shutdown"));
         h2_session_shutdown(session, arg, msg, 0);
     }
 }
@@ -1814,8 +1618,8 @@ static void h2_session_ev_no_io(h2_sessi
              * - we have finished all streams and the client has sent GO_AWAY
              */
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
-                          "h2_session(%ld): NO_IO event, %d streams open", 
-                          session->id, session->open_streams);
+                          H2_SSSN_MSG(session, "NO_IO event, %d streams open"), 
+                          session->open_streams);
             h2_conn_io_flush(&session->io);
             if (session->open_streams > 0) {
                 if (h2_mplx_awaits_data(session->mplx)) {
@@ -1861,18 +1665,6 @@ static void h2_session_ev_no_io(h2_sessi
     }
 }
 
-static void h2_session_ev_stream_ready(h2_session *session, int arg, const char *msg)
-{
-    switch (session->state) {
-        case H2_SESSION_ST_WAIT:
-            transit(session, "stream ready", H2_SESSION_ST_BUSY);
-            break;
-        default:
-            /* nop */
-            break;
-    }
-}
-
 static void h2_session_ev_data_read(h2_session *session, int arg, const char *msg)
 {
     switch (session->state) {
@@ -1915,9 +1707,8 @@ static void h2_session_ev_pre_close(h2_s
     h2_session_shutdown(session, arg, msg, 1);
 }
 
-static void h2_session_ev_stream_open(h2_session *session, int arg, const char *msg)
+static void ev_stream_open(h2_session *session, h2_stream *stream)
 {
-    ++session->open_streams;
     switch (session->state) {
         case H2_SESSION_ST_IDLE:
             if (session->open_streams == 1) {
@@ -1928,11 +1719,29 @@ static void h2_session_ev_stream_open(h2
         default:
             break;
     }
+    
+    ap_assert(!stream->scheduled);
+    if (stream->request) {
+        const h2_request *r = stream->request;
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+                      H2_STRM_MSG(stream, "schedule %s %s://%s%s chunked=%d"),
+                      r->method, r->scheme, r->authority, r->path, r->chunked);
+        stream->scheduled = 1;
+        h2_mplx_process(session->mplx, stream, stream_pri_cmp, session);
+    }
+    else {
+        h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
+    }
 }
 
-static void h2_session_ev_stream_done(h2_session *session, int arg, const char *msg)
+static void ev_stream_closed(h2_session *session, h2_stream *stream)
 {
-    --session->open_streams;
+    apr_bucket *b;
+    
+    if (H2_STREAM_CLIENT_INITIATED(stream->id)
+        && (stream->id > session->local.completed_max)) {
+        session->local.completed_max = stream->id;
+    }
     switch (session->state) {
         case H2_SESSION_ST_IDLE:
             if (session->open_streams == 0) {
@@ -1944,6 +1753,76 @@ static void h2_session_ev_stream_done(h2
         default:
             break;
     }
+    
+    /* The stream might have data in the buffers of the main connection.
+     * We can only free the allocated resources once all had been written.
+     * Send a special buckets on the connection that gets destroyed when
+     * all preceding data has been handled. On its destruction, it is safe
+     * to purge all resources of the stream. */
+    b = h2_bucket_eos_create(session->c->bucket_alloc, stream);
+    APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
+    h2_conn_io_pass(&session->io, session->bbtmp);
+    apr_brigade_cleanup(session->bbtmp);
+}
+
+static void on_stream_state_enter(void *ctx, h2_stream *stream)
+{
+    h2_session *session = ctx;
+    /* stream entered a new state */
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+                  H2_STRM_MSG(stream, "entered state"));
+    switch (stream->state) {
+        case H2_SS_IDLE: /* stream was created */
+            ++session->open_streams;
+            if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
+                ++session->remote.emitted_count;
+                if (stream->id > session->remote.emitted_max) {
+                    session->remote.emitted_max = stream->id;
+                    session->local.accepted_max = stream->id;
+                }
+            }
+            else {
+                if (stream->id > session->local.emitted_max) {
+                    ++session->local.emitted_count;
+                    session->remote.emitted_max = stream->id;
+                }
+            }
+            break;
+        case H2_SS_OPEN: /* stream has request headers */
+        case H2_SS_RSVD_L: /* stream has request headers */
+            ev_stream_open(session, stream);
+            break;
+        case H2_SS_CLOSED_L: /* stream output was closed */
+            break;
+        case H2_SS_CLOSED_R: /* stream input was closed */
+            break;
+        case H2_SS_CLOSED: /* stream in+out were closed */
+            --session->open_streams;
+            ev_stream_closed(session, stream);
+            break;
+        case H2_SS_CLEANUP:
+            h2_mplx_stream_cleanup(session->mplx, stream);
+            break;
+        default:
+            break;
+    }
+}
+
+static void on_stream_state_event(void *ctx, h2_stream *stream, 
+                                  h2_stream_event_t ev)
+{
+    h2_session *session = ctx;
+    switch (ev) {
+        case H2_SEV_CANCELLED:
+            if (session->state != H2_SESSION_ST_DONE) {
+                nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, 
+                                          stream->id, stream->rst_error);
+            }
+            break;
+        default:
+            /* NOP */
+            break;
+    }
 }
 
 static void dispatch_event(h2_session *session, h2_session_event_t ev, 
@@ -1971,9 +1850,6 @@ static void dispatch_event(h2_session *s
         case H2_SESSION_EV_NO_IO:
             h2_session_ev_no_io(session, arg, msg);
             break;
-        case H2_SESSION_EV_STREAM_READY:
-            h2_session_ev_stream_ready(session, arg, msg);
-            break;
         case H2_SESSION_EV_DATA_READ:
             h2_session_ev_data_read(session, arg, msg);
             break;
@@ -1986,23 +1862,11 @@ static void dispatch_event(h2_session *s
         case H2_SESSION_EV_PRE_CLOSE:
             h2_session_ev_pre_close(session, arg, msg);
             break;
-        case H2_SESSION_EV_STREAM_OPEN:
-            h2_session_ev_stream_open(session, arg, msg);
-            break;
-        case H2_SESSION_EV_STREAM_DONE:
-            h2_session_ev_stream_done(session, arg, msg);
-            break;
         default:
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
-                          "h2_session(%ld): unknown event %d", 
-                          session->id, ev);
+                          H2_SSSN_MSG(session, "unknown event %d"), ev);
             break;
     }
-    
-    if (session->state == H2_SESSION_ST_DONE) {
-        apr_brigade_cleanup(session->bbtmp);
-        h2_mplx_abort(session->mplx);
-    }
 }
 
 /* trigger window updates, stream resumes and submits */
@@ -2013,11 +1877,11 @@ static apr_status_t dispatch_master(h2_s
                                             on_stream_resume, session);
     if (status == APR_EAGAIN) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, session->c,
-                      "h2_session(%ld): no master event available", session->id);
+                      H2_SSSN_MSG(session, "no master event available"));
     }
     else if (status != APR_SUCCESS) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, session->c,
-                      "h2_session(%ld): dispatch error", session->id);
+                      H2_SSSN_MSG(session, "dispatch error"));
         dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 
                        H2_ERR_INTERNAL_ERROR, "dispatch error");
     }
@@ -2034,8 +1898,7 @@ apr_status_t h2_session_process(h2_sessi
 
     if (trace) {
         ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
-                      "h2_session(%ld): process start, async=%d", 
-                      session->id, async);
+                      H2_SSSN_MSG(session, "process start, async=%d"), async);
     }
                   
     while (session->state != H2_SESSION_ST_DONE) {
@@ -2055,18 +1918,22 @@ apr_status_t h2_session_process(h2_sessi
             case H2_SESSION_ST_INIT:
                 ap_update_child_status_from_conn(c->sbh, SERVER_BUSY_READ, c);
                 if (!h2_is_acceptable_connection(c, 1)) {
-                    update_child_status(session, SERVER_BUSY_READ, "inadequate security");
-                    h2_session_shutdown(session, NGHTTP2_INADEQUATE_SECURITY, NULL, 1);
+                    update_child_status(session, SERVER_BUSY_READ, 
+                                        "inadequate security");
+                    h2_session_shutdown(session, 
+                                        NGHTTP2_INADEQUATE_SECURITY, NULL, 1);
                 } 
                 else {
                     update_child_status(session, SERVER_BUSY_READ, "init");
                     status = h2_session_start(session, &rv);
-                    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, APLOGNO(03079)
-                                  "h2_session(%ld): started on %s:%d", session->id,
+                    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, 
+                                  H2_SSSN_LOG(APLOGNO(03079), session, 
+                                  "started on %s:%d"), 
                                   session->s->server_hostname,
                                   c->local_addr->port);
                     if (status != APR_SUCCESS) {
-                        dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+                        dispatch_event(session, 
+                                       H2_SESSION_EV_CONN_ERROR, 0, NULL);
                     }
                     dispatch_event(session, H2_SESSION_EV_INIT, 0, NULL);
                 }
@@ -2086,9 +1953,9 @@ apr_status_t h2_session_process(h2_sessi
                 if (!session->keep_sync_until && async && !session->open_streams
                     && !session->r && session->remote.emitted_count) {
                     if (trace) {
-                        ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
-                                      "h2_session(%ld): async idle, nonblock read, "
-                                      "%d streams open", session->id, 
+                        ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, c,
+                                      H2_SSSN_MSG(session, 
+                                      "nonblock read, %d streams open"), 
                                       session->open_streams);
                     }
                     status = h2_session_read(session, 0);
@@ -2097,9 +1964,11 @@ apr_status_t h2_session_process(h2_sessi
                         session->have_read = 1;
                         dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
                     }
-                    else if (APR_STATUS_IS_EAGAIN(status) || APR_STATUS_IS_TIMEUP(status)) {
+                    else if (APR_STATUS_IS_EAGAIN(status) 
+                        || APR_STATUS_IS_TIMEUP(status)) {
                         if (apr_time_now() > session->idle_until) {
-                            dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, NULL);
+                            dispatch_event(session, 
+                                           H2_SESSION_EV_CONN_TIMEOUT, 0, NULL);
                         }
                         else {
                             status = APR_EAGAIN;
@@ -2107,18 +1976,18 @@ apr_status_t h2_session_process(h2_sessi
                         }
                     }
                     else {
-                        ap_log_cerror( APLOG_MARK, APLOG_DEBUG, status, c,
-				      APLOGNO(03403)
-                                      "h2_session(%ld): idle, no data, error", 
-                                      session->id);
-                        dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, "timeout");
+                        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c,
+                                      H2_SSSN_LOG(APLOGNO(03403), session, 
+                                      "no data, error"));
+                        dispatch_event(session, 
+                                       H2_SESSION_EV_CONN_ERROR, 0, "timeout");
                     }
                 }
                 else {
                     if (trace) {
-                        ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
-                                      "h2_session(%ld): sync idle, stutter 1-sec, "
-                                      "%d streams open", session->id,
+                        ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, c,
+                                      H2_SSSN_MSG(session, 
+                                      "sync, stutter 1-sec, %d streams open"), 
                                       session->open_streams);
                     }
                     /* We wait in smaller increments, using a 1 second timeout.
@@ -2147,16 +2016,18 @@ apr_status_t h2_session_process(h2_sessi
                         }
                         if (now > session->idle_until) {
                             if (trace) {
-                                ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
-                                              "h2_session(%ld): keepalive timeout",
-                                              session->id);
+                                ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, c,
+                                              H2_SSSN_MSG(session, 
+                                              "keepalive timeout"));
                             }
-                            dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, "timeout");
+                            dispatch_event(session, 
+                                           H2_SESSION_EV_CONN_TIMEOUT, 0, "timeout");
                         }
                         else if (trace) {                        
-                            ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
-                                          "h2_session(%ld): keepalive, %f sec left",
-                                          session->id, (session->idle_until - now) / 1000000.0f);
+                            ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, c,
+                                          H2_SSSN_MSG(session, 
+                                          "keepalive, %f sec left"),
+                                          (session->idle_until - now) / 1000000.0f);
                         }
                         /* continue reading handling */
                     }
@@ -2165,13 +2036,13 @@ apr_status_t h2_session_process(h2_sessi
                              || APR_STATUS_IS_EOF(status)
                              || APR_STATUS_IS_EBADF(status)) {
                         ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
-                                      "h2_session(%ld): input gone", session->id);
+                                      H2_SSSN_MSG(session, "input gone"));
                         dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
                     }
                     else {
                         ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
-                                      "h2_session(%ld): idle(1 sec timeout) "
-                                      "read failed", session->id);
+                                      H2_SSSN_MSG(session, 
+                                      "(1 sec timeout) read failed"));
                         dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, "error");
                     }
                 }
@@ -2261,9 +2132,8 @@ apr_status_t h2_session_process(h2_sessi
                 }
                 else {
                     ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, c,
-				  APLOGNO(03404)
-                                  "h2_session(%ld): waiting on conditional",
-                                  session->id);
+                                  H2_SSSN_LOG(APLOGNO(03404), session, 
+                                  "waiting on conditional"));
                     h2_session_shutdown(session, H2_ERR_INTERNAL_ERROR, 
                                         "cond wait error", 0);
                 }
@@ -2271,8 +2141,8 @@ apr_status_t h2_session_process(h2_sessi
                 
             default:
                 ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, c,
-                              APLOGNO(03080)
-                              "h2_session(%ld): unknown state %d", session->id, session->state);
+                              H2_SSSN_LOG(APLOGNO(03080), session, 
+                              "unknown state"));
                 dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, 0, NULL);
                 break;
         }
@@ -2290,8 +2160,7 @@ apr_status_t h2_session_process(h2_sessi
 out:
     if (trace) {
         ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
-                      "h2_session(%ld): [%s] process returns", 
-                      session->id, state_name(session->state));
+                      H2_SSSN_MSG(session, "process returns")); 
     }
     
     if ((session->state != H2_SESSION_ST_DONE)
@@ -2312,7 +2181,7 @@ out:
 apr_status_t h2_session_pre_close(h2_session *session, int async)
 {
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, 
-                  "h2_session(%ld): pre_close", session->id);
+                  H2_SSSN_MSG(session, "pre_close"));
     dispatch_event(session, H2_SESSION_EV_PRE_CLOSE, 0, 
         (session->state == H2_SESSION_ST_IDLE)? "timeout" : NULL);
     return APR_SUCCESS;

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_session.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_session.h?rev=1782980&r1=1782979&r2=1782980&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_session.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_session.h Tue Feb 14 15:53:50 2017
@@ -51,6 +51,7 @@ struct h2_push;
 struct h2_push_diary;
 struct h2_session;
 struct h2_stream;
+struct h2_stream_monitor;
 struct h2_task;
 struct h2_workers;
 
@@ -64,13 +65,10 @@ typedef enum {
     H2_SESSION_EV_PROTO_ERROR,      /* protocol error */
     H2_SESSION_EV_CONN_TIMEOUT,     /* connection timeout */
     H2_SESSION_EV_NO_IO,            /* nothing has been read or written */
-    H2_SESSION_EV_STREAM_READY,     /* stream signalled availability of headers/data */
     H2_SESSION_EV_DATA_READ,        /* connection data has been read */
     H2_SESSION_EV_NGH2_DONE,        /* nghttp2 wants neither read nor write anything */
     H2_SESSION_EV_MPM_STOPPING,     /* the process is stopping */
     H2_SESSION_EV_PRE_CLOSE,        /* connection will close after this */
-    H2_SESSION_EV_STREAM_OPEN,      /* stream has been opened */
-    H2_SESSION_EV_STREAM_DONE,      /* stream has been handled completely */
 } h2_session_event_t;
 
 typedef struct h2_session {
@@ -101,6 +99,7 @@ typedef struct h2_session {
     
     struct h2_push_diary *push_diary; /* remember pushes, avoid duplicates */
     
+    struct h2_stream_monitor *monitor;/* monitor callbacks for streams */
     int open_streams;               /* number of client streams open */
     int unsent_submits;             /* number of submitted, but not yet written responses. */
     int unsent_promises;            /* number of submitted, but not yet written push promises */
@@ -128,6 +127,7 @@ typedef struct h2_session {
     const char *last_status_msg;    /* the one already reported */
 } h2_session;
 
+const char *h2_session_state_str(h2_session_state state);
 
 /**
  * Create a new h2_session for the given connection.
@@ -178,34 +178,12 @@ void h2_session_abort(h2_session *sessio
 void h2_session_close(h2_session *session);
 
 /**
- * Create and register a new stream under the given id.
- * 
- * @param session the session to register in
- * @param stream_id the new stream identifier
- * @param initiated_on the stream id this one is initiated on or 0
- * @param req the request for this stream or NULL if not known yet
- * @return the new stream
- */
-struct h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
-                                         int initiated_on, 
-                                         const h2_request *req);
-
-
-/**
  * Returns if client settings have push enabled.
  * @param != 0 iff push is enabled in client settings
  */
 int h2_session_push_enabled(h2_session *session);
 
 /**
- * Destroy the stream and release it everywhere. Reclaim all resources.
- * @param session the session to which the stream belongs
- * @param stream the stream to destroy
- */
-apr_status_t h2_session_stream_done(h2_session *session, 
-                                    struct h2_stream *stream);
-
-/**
  * Submit a push promise on the stream and schedule the new steam for
  * processing..
  * 
@@ -221,5 +199,10 @@ apr_status_t h2_session_set_prio(h2_sess
                                  struct h2_stream *stream, 
                                  const struct h2_priority *prio);
 
+#define H2_SSSN_MSG(s, msg)     \
+    "h2_session(%ld,%s,%d): "msg, s->id, h2_session_state_str(s->state), \
+                            s->open_streams
+
+#define H2_SSSN_LOG(aplogno, s, msg)    aplogno H2_SSSN_MSG(s, msg)
 
 #endif /* defined(__mod_h2__h2_session__) */



Mime
View raw message