qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject [qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1869: discard invalid messages arriving from core DISPATCH-1859: re-work server connection cleanup
Date Wed, 09 Dec 2020 03:11:38 GMT
This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push:
     new a11651b  DISPATCH-1869: discard invalid messages arriving from core DISPATCH-1859: re-work server connection cleanup
a11651b is described below

commit a11651be8458d84cb119e4c526f12c563f3af158
Author: Kenneth Giusti <kgiusti@apache.org>
AuthorDate: Fri Dec 4 10:35:56 2020 -0500

    DISPATCH-1869: discard invalid messages arriving from core
    DISPATCH-1859: re-work server connection cleanup
---
 include/qpid/dispatch/http1_codec.h |  18 +-
 src/adaptors/http1/http1_adaptor.c  |   6 +-
 src/adaptors/http1/http1_client.c   | 114 ++++++-----
 src/adaptors/http1/http1_codec.c    | 106 +++++-----
 src/adaptors/http1/http1_private.h  |   1 -
 src/adaptors/http1/http1_server.c   | 392 ++++++++++++++++++++----------------
 tests/system_tests_http1_adaptor.py | 171 +++++++++++++++-
 7 files changed, 516 insertions(+), 292 deletions(-)

diff --git a/include/qpid/dispatch/http1_codec.h b/include/qpid/dispatch/http1_codec.h
index 132f06c..8aac8c4 100644
--- a/include/qpid/dispatch/http1_codec.h
+++ b/include/qpid/dispatch/http1_codec.h
@@ -147,15 +147,6 @@ typedef struct h1_codec_config_t {
 h1_codec_connection_t *h1_codec_connection(h1_codec_config_t *config, void *context);
 void *h1_codec_connection_get_context(h1_codec_connection_t *conn);
 
-// Notify the codec that the endpoint closed the connection.  This should be
-// called for server connections only. Once the server has reconnected it is
-// safe to resume calling h1_codec_connection_rx_data().  This method is a
-// no-op for client connections.  When a client connection closes the
-// application must cancel all outstanding requests and then call
-// h1_codec_connection_free() instead.
-//
-void h1_codec_connection_closed(h1_codec_connection_t *conn);
-
 // Release the codec.  This can only be done after all outstanding requests
 // have been completed or cancelled.
 //
@@ -169,6 +160,15 @@ void h1_codec_connection_free(h1_codec_connection_t *conn);
 //
 int h1_codec_connection_rx_data(h1_codec_connection_t *conn, qd_buffer_list_t *data, uintmax_t len);
 
+// Notify the codec that the endpoint closed the connection.  For server-facing
+// connections it is safe to resume calling h1_codec_connection_rx_data() for
+// the h1_codec_connection once the connection to the server is reestablished.
+// Client-facing connections cannot be resumed after the connection has been
+// closed. In the client case the  application must cancel all outstanding
+// requests and then call h1_codec_connection_free() instead.
+//
+void h1_codec_connection_rx_closed(h1_codec_connection_t *conn);
+
 void h1_codec_request_state_set_context(h1_codec_request_state_t *hrs, void *context);
 void *h1_codec_request_state_get_context(const h1_codec_request_state_t *hrs);
 h1_codec_connection_t *h1_codec_request_state_get_connection(const h1_codec_request_state_t *hrs);
diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c
index 9eb177f..931d268 100644
--- a/src/adaptors/http1/http1_adaptor.c
+++ b/src/adaptors/http1/http1_adaptor.c
@@ -182,11 +182,9 @@ void qdr_http1_close_connection(qdr_http1_connection_t *hconn, const char *error
                "[C%"PRIu64"] Connection closing: %s", hconn->conn_id, error);
     }
 
-    qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
-           "[C%"PRIu64"] Initiating close of connection", hconn->conn_id);
-
     if (hconn->raw_conn) {
-        hconn->close_connection = true;
+        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+               "[C%"PRIu64"] Initiating close of connection", hconn->conn_id);
         pn_raw_connection_close(hconn->raw_conn);
     }
 
diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c
index 031135b..33412da 100644
--- a/src/adaptors/http1/http1_client.c
+++ b/src/adaptors/http1/http1_client.c
@@ -355,10 +355,10 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
     qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) context;
     qd_log_source_t *log = qdr_http1_adaptor->log;
 
-    qd_log(log, QD_LOG_DEBUG, "RAW CONNECTION EVENT %s\n", pn_event_type_name(pn_event_type(e)));
-
     if (!hconn) return;
 
+    qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] HTTP client proactor event %s", hconn->conn_id, pn_event_type_name(pn_event_type(e)));
+
     switch (pn_event_type(e)) {
 
     case PN_RAW_CONNECTION_CONNECTED: {
@@ -409,7 +409,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
     case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
         qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need read buffers", hconn->conn_id);
         // @TODO(kgiusti): backpressure if no credit
-        if (hconn->client.reply_to_addr && !hconn->close_connection /* && hconn->in_link_credit > 0 */) {
+        if (hconn->client.reply_to_addr /* && hconn->in_link_credit > 0 */) {
             int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
             qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted",
                    hconn->conn_id, granted);
@@ -451,7 +451,8 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
     if (hconn) {
         _client_request_t *hreq = (_client_request_t*) DEQ_HEAD(hconn->requests);
         if (hreq) {
-            qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP is client request complete????", hconn->conn_id);
+            qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP is client request msg-id=%"PRIu64" complete????",
+                   hconn->conn_id, hreq->base.msg_id);
             qd_log(log, QD_LOG_DEBUG, "   codec=%s req-dlv=%p resp-dlv=%d req_msg=%p %s",
                    hreq->codec_completed ? "Done" : "Not Done",
                    (void*)hreq->request_dlv,
@@ -467,6 +468,9 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
     _client_request_t *hreq = (_client_request_t *)DEQ_HEAD(hconn->requests);
     if (hreq) {
         if (hreq->cancelled) {
+            qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+                   "[C%"PRIu64"][L%"PRIu64"] HTTP client request msg-id=%"PRIu64" cancelled",
+                       hconn->conn_id, hconn->out_link_id, hreq->base.msg_id);
             need_close = true;
         } else {
             // Can we retire the current outgoing response messages?
@@ -477,8 +481,8 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
                    DEQ_IS_EMPTY(rmsg->out_data.fifo)) {
                 // response message fully received and forwarded to client
                 qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
-                       "[C%"PRIu64"][L%"PRIu64"] HTTP client settling response, dispo=0x%"PRIx64,
-                       hconn->conn_id, hconn->out_link_id, rmsg->dispo);
+                       "[C%"PRIu64"][L%"PRIu64"] HTTP client request msg-id=%"PRIu64" settling response, dispo=0x%"PRIx64,
+                       hconn->conn_id, hconn->out_link_id, hreq->base.msg_id, rmsg->dispo);
                 qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
                                                   rmsg->dlv,
                                                   rmsg->dispo,
@@ -495,7 +499,8 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
                 DEQ_IS_EMPTY(hreq->responses) &&
                 hreq->request_settled) {
 
-                qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP request completed!", hconn->conn_id);
+                qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP request msg-id=%"PRIu64" completed!",
+                       hconn->conn_id, hreq->base.msg_id);
 
                 need_close = hreq->close_on_complete;
                 _client_request_free(hreq);
@@ -512,8 +517,8 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
             if (hreq->request_msg && hconn->in_link_credit > 0) {
 
                 qd_log(hconn->adaptor->log, QD_LOG_TRACE,
-                       "[C%"PRIu64"][L%"PRIu64"] Delivering request to router",
-                       hconn->conn_id, hconn->in_link_id);
+                       "[C%"PRIu64"][L%"PRIu64"] Delivering next request msg-id=%"PRIu64" to router",
+                       hconn->conn_id, hconn->in_link_id, hreq->base.msg_id);
 
                 hconn->in_link_credit -= 1;
                 hreq->request_dlv = qdr_link_deliver(hconn->in_link, hreq->request_msg, 0, false, 0, 0, 0, 0);
@@ -614,10 +619,6 @@ static int _client_rx_request_cb(h1_codec_request_state_t *hrs,
     h1_codec_connection_t    *h1c = h1_codec_request_state_get_connection(hrs);
     qdr_http1_connection_t *hconn = (qdr_http1_connection_t*)h1_codec_connection_get_context(h1c);
 
-    qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
-           "[C%"PRIu64"] HTTP request received: method=%s target=%s version=%"PRIi32".%"PRIi32,
-           hconn->conn_id, method, target, version_major, version_minor);
-
     _client_request_t *creq = new__client_request_t();
     ZERO(creq);
     creq->base.start = qd_timer_now();
@@ -627,6 +628,10 @@ static int _client_rx_request_cb(h1_codec_request_state_t *hrs,
     creq->close_on_complete = (version_minor == 0);
     DEQ_INIT(creq->responses);
 
+    qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+           "[C%"PRIu64"] HTTP request received: msg-id=%"PRIu64" method=%s target=%s version=%"PRIi32".%"PRIi32,
+           hconn->conn_id, creq->base.msg_id, method, target, version_major, version_minor);
+
     creq->request_props = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
     qd_compose_start_map(creq->request_props);
     {
@@ -761,8 +766,8 @@ static int _client_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_bo
         hconn->in_link_credit -= 1;
 
         qd_log(hconn->adaptor->log, QD_LOG_TRACE,
-               "[C%"PRIu64"][L%"PRIu64"] Delivering request to router",
-               hconn->conn_id, hconn->in_link_id);
+               "[C%"PRIu64"][L%"PRIu64"] Delivering request msg-id=%"PRIu64" to router",
+               hconn->conn_id, hconn->in_link_id, hreq->base.msg_id);
 
         hreq->request_dlv = qdr_link_deliver(hconn->in_link, hreq->request_msg, 0, false, 0, 0, 0, 0);
         qdr_delivery_set_context(hreq->request_dlv, (void*) hreq);
@@ -811,8 +816,8 @@ static void _client_rx_done_cb(h1_codec_request_state_t *hrs)
     qd_message_t             *msg = hreq->request_msg ? hreq->request_msg : qdr_delivery_message(hreq->request_dlv);
 
     qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
-           "[C%"PRIu64"][L%"PRIu64"] HTTP request receive complete.",
-           hconn->conn_id, hconn->in_link_id);
+           "[C%"PRIu64"][L%"PRIu64"] HTTP request msg-id=%"PRIu64" receive complete.",
+           hconn->conn_id, hconn->in_link_id, hreq->base.msg_id);
 
     if (!qd_message_receive_complete(msg)) {
         qd_message_set_receive_complete(msg);
@@ -838,8 +843,9 @@ static void _client_request_complete_cb(h1_codec_request_state_t *lib_rs, bool c
         uint64_t in_octets, out_octets;
         h1_codec_request_state_counters(lib_rs, &in_octets, &out_octets);
         qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
-               "[C%"PRIu64"] HTTP request/response %s. Octets read: %"PRIu64" written: %"PRIu64,
+               "[C%"PRIu64"] HTTP request msg-id=%"PRIu64" %s. Octets read: %"PRIu64" written: %"PRIu64,
                hreq->base.hconn->conn_id,
+               hreq->base.msg_id,
                cancelled ? "cancelled!" : "codec done",
                in_octets, out_octets);
     }
@@ -899,8 +905,8 @@ void qdr_http1_client_core_link_flow(qdr_http1_adaptor_t    *adaptor,
             hconn->in_link_credit -= 1;
 
             qd_log(hconn->adaptor->log, QD_LOG_TRACE,
-                   "[C%"PRIu64"][L%"PRIu64"] Delivering request to router",
-                   hconn->conn_id, hconn->in_link_id);
+                   "[C%"PRIu64"][L%"PRIu64"] Delivering next request msg-id=%"PRIu64" to router",
+                   hconn->conn_id, hconn->in_link_id, hreq->base.msg_id);
 
             hreq->request_dlv = qdr_link_deliver(hconn->in_link, hreq->request_msg, 0, false, 0, 0, 0, 0);
             qdr_delivery_set_context(hreq->request_dlv, (void*) hreq);
@@ -923,30 +929,37 @@ void qdr_http1_client_core_delivery_update(qdr_http1_adaptor_t      *adaptor,
     assert(dlv == hreq->request_dlv);
 
     qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
-           "[C%"PRIu64"][L%"PRIu64"] HTTP request delivery update, outcome=0x%"PRIx64"%s",
-           hconn->conn_id, hconn->in_link_id, disp, settled ? " settled" : "");
+           "[C%"PRIu64"][L%"PRIu64"] HTTP request msg-id=%"PRIu64" delivery update, outcome=0x%"PRIx64"%s",
+           hconn->conn_id, hconn->in_link_id, hreq->base.msg_id, disp, settled ? " settled" : "");
 
     if (disp && disp != PN_RECEIVED && hreq->request_dispo == 0) {
         // terminal disposition
         hreq->request_dispo = disp;
         if (disp != PN_ACCEPTED) {
-            // no response message is going to arrive.  Now what?  For now fake
-            // a response from the server by using the codec to write an error
-            // response on the behalf of the server.
-            qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
-                   "[C%"PRIu64"][L%"PRIu64"] HTTP request failure, outcome=0x%"PRIx64,
-                   hconn->conn_id, hconn->in_link_id, disp);
 
-            _client_response_msg_t *rmsg = new__client_response_msg_t();
-            ZERO(rmsg);
-            DEQ_INIT(rmsg->out_data.fifo);
-            DEQ_INSERT_TAIL(hreq->responses, rmsg);
+            qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+                   "[C%"PRIu64"][L%"PRIu64"] HTTP request msg-id=%"PRIu64" failure, outcome=0x%"PRIx64,
+                   hconn->conn_id, hconn->in_link_id, hreq->base.msg_id, disp);
+
+            if (DEQ_IS_EMPTY(hreq->responses)) {
+                // best effort attempt to send an error to the client
+                // if nothing has been sent back so far
+                _client_response_msg_t *rmsg = new__client_response_msg_t();
+                ZERO(rmsg);
+                DEQ_INIT(rmsg->out_data.fifo);
+                DEQ_INSERT_TAIL(hreq->responses, rmsg);
+
+                if (disp == PN_REJECTED) {
+                    qdr_http1_error_response(&hreq->base, 400, "Bad Request");
+                } else {
+                    // total guess as to what the proper error code should be
+                    qdr_http1_error_response(&hreq->base, 503, "Service Unavailable");
+                }
+                hreq->close_on_complete = true;  // trust nothing
 
-            if (disp == PN_REJECTED) {
-                qdr_http1_error_response(&hreq->base, 400, "Bad Request");
             } else {
-                // total guess as to what the proper error code should be
-                qdr_http1_error_response(&hreq->base, 503, "Service Unavailable");
+                // partial response already sent - punt:
+                qdr_http1_close_connection(hconn, "HTTP request failed");
             }
         }
     }
@@ -1102,20 +1115,7 @@ static uint64_t _encode_response_message(_client_request_t *hreq,
                                          _client_response_msg_t *rmsg)
 {
     qdr_http1_connection_t *hconn = hreq->base.hconn;
-    qd_message_t *msg = qdr_delivery_message(rmsg->dlv);
-    qd_message_depth_status_t status = qd_message_check_depth(msg, QD_DEPTH_BODY);
-
-    if (status == QD_MESSAGE_DEPTH_INCOMPLETE)
-        return 0;
-
-    if (status == QD_MESSAGE_DEPTH_INVALID) {
-        qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
-               "[C%"PRIu64"][L%"PRIu64"] body data depth check failed",
-               hconn->conn_id, hconn->out_link_id);
-        return PN_REJECTED;
-    }
-
-    assert(status == QD_MESSAGE_DEPTH_OK);
+    qd_message_t             *msg = qdr_delivery_message(rmsg->dlv);
 
     if (!rmsg->headers_encoded) {
         rmsg->headers_encoded = true;
@@ -1159,6 +1159,9 @@ static uint64_t _encode_response_message(_client_request_t *hreq,
             return PN_ACCEPTED;
 
         case QD_MESSAGE_STREAM_DATA_INCOMPLETE:
+            qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+                   "[C%"PRIu64"][L%"PRIu64"] body data need more",
+                   hconn->conn_id, hconn->out_link_id);
             return 0;  // wait for more
 
         case QD_MESSAGE_STREAM_DATA_INVALID:
@@ -1184,6 +1187,9 @@ uint64_t qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t    *adaptor,
                                             bool                    settled)
 {
     qd_message_t        *msg = qdr_delivery_message(delivery);
+    if (qd_message_is_discard(msg))
+        return 0;
+
     _client_request_t  *hreq = (_client_request_t*) qdr_delivery_get_context(delivery);
     if (!hreq) {
         // new delivery - look for corresponding request via correlation_id
@@ -1196,6 +1202,7 @@ uint64_t qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t    *adaptor,
                    "[C%"PRIu64"][L%"PRIu64"] Malformed HTTP/1.x message",
                    hconn->conn_id, link->identity);
             qd_message_set_send_complete(msg);
+            qd_message_set_discard(msg, true);
             qdr_http1_close_connection(hconn, "Malformed response message");
             return PN_REJECTED;
 
@@ -1207,6 +1214,7 @@ uint64_t qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t    *adaptor,
                 qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
                        "[C%"PRIu64"][L%"PRIu64"] Discarding malformed message.", hconn->conn_id, link->identity);
                 qd_message_set_send_complete(msg);
+                qd_message_set_discard(msg, true);
                 qdr_http1_close_connection(hconn, "Cannot correlate response message");
                 return PN_REJECTED;
             }
@@ -1219,6 +1227,9 @@ uint64_t qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t    *adaptor,
             qdr_delivery_set_context(delivery, hreq);
             qdr_delivery_incref(delivery, "referenced by HTTP1 adaptor");
             DEQ_INSERT_TAIL(hreq->responses, rmsg);
+            qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+                   "[C%"PRIu64"][L%"PRIu64"] HTTP received response for msg-id=%"PRIu64,
+                   hconn->conn_id, hconn->out_link_id, hreq->base.msg_id);
             break;
         }
     }
@@ -1238,6 +1249,7 @@ uint64_t qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t    *adaptor,
             } else {
                 // The response was bad.  There's not much that can be done to
                 // recover, so for now I punt...
+                qd_message_set_discard(msg, true);
                 qdr_http1_close_connection(hconn, "Cannot parse response message");
             }
         }
@@ -1272,7 +1284,7 @@ static void _client_response_msg_free(_client_request_t *req, _client_response_m
 //
 static void _write_pending_response(_client_request_t *hreq)
 {
-    if (hreq && !hreq->cancelled && !hreq->base.hconn->close_connection) {
+    if (hreq && !hreq->cancelled) {
         assert(DEQ_PREV(&hreq->base) == 0);  // must preserve order
         _client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
         if (rmsg && rmsg->out_data.write_ptr) {
diff --git a/src/adaptors/http1/http1_codec.c b/src/adaptors/http1/http1_codec.c
index 4940618..36d7ac1 100644
--- a/src/adaptors/http1/http1_codec.c
+++ b/src/adaptors/http1/http1_codec.c
@@ -107,6 +107,7 @@ struct h1_codec_request_state_t {
     bool no_body_method;    // true if request method is either HEAD or CONNECT
     bool request_complete;  // true when request message done encoding/decoding
     bool response_complete; // true when response message done encoding/decoding
+    bool close_expected;    // if true do not signal request_complete cb until closed
 };
 DEQ_DECLARE(h1_codec_request_state_t, h1_codec_request_state_list_t);
 ALLOC_DECLARE(h1_codec_request_state_t);
@@ -239,39 +240,6 @@ h1_codec_connection_t *h1_codec_connection(h1_codec_config_t *config, void *cont
 }
 
 
-// The connection has closed.  If this is a connection to a server this may
-// simply be the end of the response message.  Mark the in-flight response as
-// completed.
-//
-void h1_codec_connection_closed(h1_codec_connection_t *conn)
-{
-    if (conn && conn->config.type == HTTP1_CONN_SERVER) {
-
-        // is decoding a response in progress
-        struct decoder_t *decoder = &conn->decoder;
-        h1_codec_request_state_t *hrs = decoder->hrs;
-        if (hrs && hrs->request_complete) {
-            // the corresponding request msg is complete
-            if (!hrs->response_complete) {
-                hrs->response_complete = true;
-                conn->config.rx_done(hrs);
-            }
-            conn->config.request_complete(hrs, false);
-            decoder_reset(decoder);
-            h1_codec_request_state_free(hrs);
-            if (hrs == conn->encoder.hrs)
-                encoder_reset(&conn->encoder);
-        }
-
-        // since the underlying connection is gone discard all remaining
-        // incoming data
-        decoder_reset(decoder);
-        qd_buffer_list_free_buffers(&conn->decoder.incoming);
-        decoder->read_ptr = NULL_I_PTR;
-    }
-}
-
-
 // Free the connection
 //
 void h1_codec_connection_free(h1_codec_connection_t *conn)
@@ -835,6 +803,17 @@ static bool process_headers_done(h1_codec_connection_t *conn, struct decoder_t *
                 has_body = false;
             }
         }
+
+        // In certain scenarios an HTTP server will close the connection to
+        // indicate the end of a response message. This may happen even if
+        // the request message has a known length (Content-Length or
+        // Transfer-Encoding).  In these circumstances do NOT signal that
+        // the request is complete (call request_complete() callback) until
+        // the connection closes.  Otherwise the user may start sending the
+        // next request message before the HTTP server closes the TCP
+        // connection.  (see RFC7230, section Persistence)
+        hrs->close_expected = decoder->hdr_conn_close
+            || (decoder->is_http10 && !decoder->hdr_conn_keep_alive);
     }
 
     decoder->error = conn->config.rx_headers_done(decoder->hrs, has_body);
@@ -1257,25 +1236,12 @@ static bool parse_done(h1_codec_connection_t *conn, struct decoder_t *decoder)
     // signal the message receive is complete
     conn->config.rx_done(hrs);
 
-    bool close_expected = false;
     if (is_response) {
-        // Informational 1xx response codes are NOT teriminal - further responses are allowed!
+        // Informational 1xx response codes are NOT terminal - further responses are allowed!
         if (IS_INFO_RESPONSE(hrs->response_code)) {
             hrs->response_code = 0;
         } else {
             hrs->response_complete = true;
-
-            // In certain scenarios an HTTP server will close the connection to
-            // indicate the end of a response message. This may happen even if
-            // the request message has a known length (Content-Length or
-            // Transfer-Encoding).  In these circumstances do NOT signal that
-            // the request is complete (call request_complete() callback) until
-            // the connection closes.  Otherwise the user may start sending the
-            // next request message before the HTTP server closes the TCP
-            // connection.  (see RFC7230, section Persistence)
-
-            close_expected = decoder->hdr_conn_close
-                || (decoder->is_http10 && !decoder->hdr_conn_keep_alive);
         }
     } else {
         hrs->request_complete = true;
@@ -1283,7 +1249,7 @@ static bool parse_done(h1_codec_connection_t *conn, struct decoder_t *decoder)
 
     decoder_reset(decoder);
 
-    if (!close_expected) {
+    if (!hrs->close_expected) {
         if (hrs->request_complete && hrs->response_complete) {
             conn->config.request_complete(hrs, false);
             h1_codec_request_state_free(hrs);
@@ -1356,6 +1322,50 @@ int h1_codec_connection_rx_data(h1_codec_connection_t *conn, qd_buffer_list_t *d
 }
 
 
+// The read channel of the connection has closed.  If this is a connection to a
+// server this may simply be the end of the response message.  If a message is
+// currently being decoded see if it is valid to complete.
+//
+void h1_codec_connection_rx_closed(h1_codec_connection_t *conn)
+{
+    if (conn && conn->config.type == HTTP1_CONN_SERVER) {
+
+        // terminate the in progress decode
+
+        struct decoder_t *decoder = &conn->decoder;
+        h1_codec_request_state_t *hrs = decoder->hrs;
+        if (hrs) {
+            // consider the response valid if length is unspecified since in
+            // this case the server must close the connection to complete the
+            // message body
+            if (decoder->state == HTTP1_MSG_STATE_BODY
+                && !decoder->is_chunked
+                && !decoder->hdr_content_length) {
+
+                if (!hrs->response_complete) {
+                    hrs->response_complete = true;
+                    conn->config.rx_done(hrs);
+                }
+            }
+        }
+
+        decoder_reset(decoder);
+        // since the underlying connection is gone discard all remaining
+        // incoming data
+        qd_buffer_list_free_buffers(&conn->decoder.incoming);
+        decoder->read_ptr = NULL_I_PTR;
+
+        // complete any "done" requests
+        hrs = DEQ_HEAD(conn->hrs_queue);
+        while (hrs && hrs->response_complete && hrs->request_complete) {
+            conn->config.request_complete(hrs, false);
+            h1_codec_request_state_free(hrs);
+            hrs = DEQ_HEAD(conn->hrs_queue);
+        }
+    }
+}
+
+
 void h1_codec_request_state_set_context(h1_codec_request_state_t *hrs, void *context)
 {
     hrs->context = context;
diff --git a/src/adaptors/http1/http1_private.h b/src/adaptors/http1/http1_private.h
index c6580d5..e321a1c 100644
--- a/src/adaptors/http1/http1_private.h
+++ b/src/adaptors/http1/http1_private.h
@@ -183,7 +183,6 @@ struct qdr_http1_connection_t {
     // flags
     //
     bool trace;
-    bool close_connection;
 };
 ALLOC_DECLARE(qdr_http1_connection_t);
 
diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c
index 282b5d9..92ea0d1 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -69,7 +69,6 @@ typedef struct _server_request_t {
     uint64_t        request_dispo;   // set by adaptor during encode
     bool            request_settled; // set by adaptor
     bool            request_acked;   // true if dispo sent to core
-    bool            request_encoded; // true when encoding done
     bool            headers_encoded; // True when header encode done
 
     qdr_http1_out_data_fifo_t out_data;  // encoded request written to raw conn
@@ -79,6 +78,7 @@ typedef struct _server_request_t {
     bool codec_completed;     // Request and Response HTTP msgs OK
     bool cancelled;
     bool close_on_complete;   // close the conn when this request is complete
+    bool response_complete;   // true when server response message decoded
 } _server_request_t;
 ALLOC_DECLARE(_server_request_t);
 ALLOC_DEFINE(_server_request_t);
@@ -125,6 +125,7 @@ static void _server_response_msg_free(_server_request_t *req, _server_response_m
 static void _server_request_free(_server_request_t *hreq);
 static void _write_pending_request(_server_request_t *req);
 static void _cancel_request(_server_request_t *req);
+static bool _process_requests(qdr_http1_connection_t *hconn);
 
 
 ////////////////////////////////////////////////////////
@@ -409,6 +410,8 @@ static void _do_reconnect(void *context)
             qdr_http1_connection_free(hconn);
             return;
         }
+
+        _process_requests(hconn);
     }
 
     // lock out core activation
@@ -432,10 +435,10 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
     qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) context;
     qd_log_source_t *log = qdr_http1_adaptor->log;
 
-    qd_log(log, QD_LOG_DEBUG, "RAW CONNECTION EVENT %s\n", pn_event_type_name(pn_event_type(e)));
-
     if (!hconn) return;
 
+    qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] HTTP server proactor event %s", hconn->conn_id, pn_event_type_name(pn_event_type(e)));
+
     switch (pn_event_type(e)) {
 
     case PN_RAW_CONNECTION_CONNECTED: {
@@ -447,32 +450,41 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
     case PN_RAW_CONNECTION_CLOSED_READ: {
         // notify the codec so it can complete the current response
         // message (response body terminated on connection closed)
-        h1_codec_connection_closed(hconn->http_conn);
+        h1_codec_connection_rx_closed(hconn->http_conn);
+
+        // if the response for the current request has not fully arrived cancel
+        // the request
+        _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
+        if (hreq) {
+            if (hreq->base.out_http1_octets > 0) {  // req msg written to server
+                if (!hreq->response_complete) {
+                    _cancel_request(hreq);
+                }
+            }
+        }
+        pn_raw_connection_close(hconn->raw_conn);
+        break;
     }
-    // fall through
+
     case PN_RAW_CONNECTION_CLOSED_WRITE: {
-        qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Closed for %s", hconn->conn_id,
-               pn_event_type(e) == PN_RAW_CONNECTION_CLOSED_READ
-               ? "reading" : "writing");
+        // cancel the current request if the request has not been fully written
+        // to the raw connection
+        _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
+        if (hreq) {
+            if (hreq->base.out_http1_octets > 0) {  // req msg written to server
+                if (!DEQ_IS_EMPTY(hreq->out_data.fifo)) {
+                    _cancel_request(hreq);
+                }
+            }
+        }
         pn_raw_connection_close(hconn->raw_conn);
         break;
     }
     case PN_RAW_CONNECTION_DISCONNECTED: {
-        pn_raw_connection_set_context(hconn->raw_conn, 0);
-        hconn->close_connection = false;
-
         qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Connection closed", hconn->conn_id);
 
-
-        // if the current request was not completed, cancel it.  it's ok if
-        // there are outstanding *response* deliveries in flight as long as the
-        // response(s) have been completely received from the server
-        // (request_complete == true).
-
-        _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
-        if (hreq && !hreq->codec_completed && hreq->base.out_http1_octets > 0) {
-            _cancel_request(hreq);
-        }
+        pn_raw_connection_set_context(hconn->raw_conn, 0);
+        _process_requests(hconn);
 
         //
         // reconnect to the server. Leave the links intact so pending requests
@@ -514,11 +526,9 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
         qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need read buffers", hconn->conn_id);
         // @TODO(kgiusti): backpressure if no credit
         // if (hconn->in_link_credit > 0 */)
-        if (!hconn->close_connection) {
-            int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
-            qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted",
-                   hconn->conn_id, granted);
-        }
+        int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
+        qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted",
+               hconn->conn_id, granted);
         break;
     }
     case PN_RAW_CONNECTION_WAKE: {
@@ -568,136 +578,149 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
         qdr_http1_connection_free(hconn);
 
     } else {
+        bool need_close = _process_requests(hconn);
 
-        bool need_close = false;
-        _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
-        if (hreq) {
-            // remove me:
-            qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP is server request complete????", hconn->conn_id);
-            qd_log(log, QD_LOG_DEBUG, "   codec_completed=%s cancelled=%s",
-                   hreq->codec_completed ? "Complete" : "Not Complete",
-                   hreq->cancelled ? "Cancelled" : "Not Cancelled");
-            qd_log(log, QD_LOG_DEBUG, "   Req: dlv=%p dispo=%"PRIu64" settled=%d acked=%d",
-                   (void*) hreq->request_dlv, hreq->request_dispo, hreq->request_settled,
-                   hreq->request_acked);
-            qd_log(log, QD_LOG_DEBUG, "   Req: out_data=%d pton=%d resp-count=%d",
-                   (int) DEQ_SIZE(hreq->out_data.fifo),
-                   qdr_http1_out_data_buffers_outstanding(&hreq->out_data),
-                   (int) DEQ_SIZE(hreq->responses));
-
-            // Check for completed or cancelled requests
-
-            if (hreq->cancelled) {
-
-                // request:  have to wait until all buffers returned from proton
-                // before we can release the request delivery...
-                if (qdr_http1_out_data_buffers_outstanding(&hreq->out_data))
-                    return;
-
-                if (hreq->request_dlv) {
-                    // let the message drain... (TODO@(kgiusti) is this necessary?
-                    if (!qdr_delivery_receive_complete(hreq->request_dlv))
-                        return;
-
-                    uint64_t dispo = hreq->request_dispo ? hreq->request_dispo : PN_MODIFIED;
-                    qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
-                                                      hreq->request_dlv,
-                                                      dispo,
-                                                      true,   // settled
-                                                      0,      // error
-                                                      0,      // dispo data
-                                                      false);
-                    qdr_delivery_set_context(hreq->request_dlv, 0);
-                    qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request cancelled");
-                    hreq->request_dlv = 0;
-                }
+        if (need_close) {
+            qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Closing connection!", hconn->conn_id);
+            qdr_http1_close_connection(hconn, "HTTP Request requires connection close");
+        }
+    }
+}
 
-                _server_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
-                while (rmsg) {
-                    if (rmsg->dlv) {
-                        qd_message_set_receive_complete(qdr_delivery_message(rmsg->dlv));
-                        qdr_delivery_set_aborted(rmsg->dlv, true);
-                    }
-                    _server_response_msg_free(hreq, rmsg);
-                    rmsg = DEQ_HEAD(hreq->responses);
-                }
 
-                // The state of the connection to the server will be unknown if
-                // this request was not completed.
-                if (!hreq->codec_completed && hreq->base.out_http1_octets > 0)
-                    need_close = true;
+// See if the current request can be completed and the next pending request
+// started. Return true if the connection must be closed before starting the
+// next request.
+static bool _process_requests(qdr_http1_connection_t *hconn)
+{
+    bool              need_close = false;
+    _server_request_t *next_hreq = 0;
+    _server_request_t      *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
 
-                _server_request_free(hreq);
+    if (!hreq)
+        return need_close;
 
-            } else {
+    qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+           "[C%"PRIu64"] Processing current HTTP request msg-id=%"PRIu64", state=%s",
+           hconn->conn_id, hreq->base.msg_id,
+           hreq->codec_completed ? "codec complete"
+           : hreq->cancelled ? "request cancelled"
+           : "in-progress");
 
-                // Can the request disposition be updated?  Disposition can be
-                // updated after the entire encoded request has been written to the
-                // server.
-                if (!hreq->request_acked &&
-                    hreq->request_encoded &&
-                    DEQ_SIZE(hreq->out_data.fifo) == 0) {
-
-                    qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
-                                                      hreq->request_dlv,
-                                                      hreq->request_dispo,
-                                                      false,   // settled
-                                                      0,      // error
-                                                      0,      // dispo data
-                                                      false);
-                    hreq->request_acked = true;
-                }
+    if (hreq->cancelled) {
 
-                // Can we settle request?  Settle the request delivery after all
-                // response messages have been received from the server
-                // (codec_complete).  Note that the responses may not have finished
-                // being delivered to the core (lack of credit, etc.)
-                //
-                if (!hreq->request_settled &&
-                    hreq->request_acked &&  // implies out_data done
-                    hreq->codec_completed) {
-
-                    qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
-                                                      hreq->request_dlv,
-                                                      hreq->request_dispo,
-                                                      true,   // settled
-                                                      0,      // error
-                                                      0,      // dispo data
-                                                      false);
-                    // can now release the delivery
-                    qdr_delivery_set_context(hreq->request_dlv, 0);
-                    qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request settled");
-                    hreq->request_dlv = 0;
-
-                    hreq->request_settled = true;
-                }
+        // clean up the request message delivery
+        if (hreq->request_dlv) {
+            qd_message_set_discard(qdr_delivery_message(hreq->request_dlv), true);
+
+            if (!hreq->request_acked || !hreq->request_settled) {
+
+                if (hreq->request_dispo == 0)
+                    hreq->request_dispo = (hreq->base.out_http1_octets > 0
+                                           ? PN_MODIFIED : PN_RELEASED);
+
+                qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
+                                                  hreq->request_dlv,
+                                                  hreq->request_dispo,
+                                                  true,   // settled
+                                                  0,      // error
+                                                  0,      // dispo data
+                                                  false);
+                hreq->request_acked = hreq->request_settled = true;
+            }
+            qdr_delivery_set_context(hreq->request_dlv, 0);
+            qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request cancelled");
+            hreq->request_dlv = 0;
+        }
 
-                // Has the entire request/response completed?  It is complete after
-                // the request message has been settled and all responses have been
-                // delivered to the core.
-                //
-                if (hreq->request_acked &&
-                    hreq->request_settled &&
-                    DEQ_SIZE(hreq->responses) == 0) {
-
-                    qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP request completed!", hconn->conn_id);
-                    _server_request_free(hreq);
-
-                    // coverity ignores the fact that _server_request_free() calls
-                    // the base cleanup which removes hreq from hconn->requests.
-                    // coverity[use_after_free]
-                    hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
-                    if (hreq)
-                        _write_pending_request(hreq);
-                }
+        // drop in flight responses
+        _server_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
+        while (rmsg) {
+            if (rmsg->dlv) {
+                qd_message_set_receive_complete(qdr_delivery_message(rmsg->dlv));
+                qdr_delivery_set_aborted(rmsg->dlv, true);
             }
+            _server_response_msg_free(hreq, rmsg);
+            rmsg = DEQ_HEAD(hreq->responses);
         }
 
-        if (need_close) {
-            qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Closing connection!", hconn->conn_id);
-            qdr_http1_close_connection(hconn, "Request cancelled");
+        // have to wait until all buffers returned from proton
+        // before we can release the request
+        if (qdr_http1_out_data_buffers_outstanding(&hreq->out_data))
+            return false;
+
+        // it is safe to keep the connection up if this request has never been
+        // written to the connection, otherwise the state of the connection is
+        // unknown so close it
+
+        if (hreq->base.out_http1_octets > 0)
+            need_close = true;
+        else
+            next_hreq = (_server_request_t*) DEQ_NEXT(&hreq->base);
+
+        qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] HTTP request msg-id=%"PRIu64" cancelled",
+               hconn->conn_id, hreq->base.msg_id);
+        _server_request_free(hreq);
+
+        if (hconn->out_link)
+            qdr_link_flow(qdr_http1_adaptor->core, hconn->out_link, 1, false);
+
+
+    } else if (hreq->codec_completed) {
+
+        // The request message has been fully encoded and the response msg(s)
+        // have been completely received.  The terminal disposition for the
+        // request message delivery can be set now since the server is done
+        // responding.  The request disposition can be settled after all the
+        // response messages have been delivered to the core.
+
+        // hreq->out_data.fifo ==> request message written to raw conn
+        // DEQ_IS_EMPTY(hreq->responses)
+        if (!hreq->request_acked || (!hreq->request_settled
+                                     && DEQ_IS_EMPTY(hreq->responses))) {
+
+            assert(hreq->request_dlv);
+            assert(hreq->request_dispo == PN_ACCEPTED);
+            hreq->request_settled = DEQ_IS_EMPTY(hreq->responses);
+            qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
+                                              hreq->request_dlv,
+                                              hreq->request_dispo,
+                                              hreq->request_settled,
+                                              0,      // error
+                                              0,      // dispo data
+                                              false);
+            hreq->request_acked = true;
+            if (hreq->request_settled) {
+                qdr_delivery_set_context(hreq->request_dlv, 0);
+                qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request settled");
+                hreq->request_dlv = 0;
+            }
+        }
+
+        if (hreq->request_acked && hreq->request_settled && DEQ_SIZE(hreq->out_data.fifo) == 0) {
+            qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP request msg-id=%"PRIu64" completed!",
+                   hconn->conn_id, hreq->base.msg_id);
+
+            if (hreq->close_on_complete)
+                need_close = true;
+            else
+                next_hreq = (_server_request_t*) DEQ_NEXT(&hreq->base);
+
+            _server_request_free(hreq);
+
+            if (hconn->out_link)
+                qdr_link_flow(qdr_http1_adaptor->core, hconn->out_link, 1, false);
         }
     }
+
+    if (next_hreq) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+               "[C%"PRIu64"] starting new HTTP request msg-id=%"PRIu64,
+               hconn->conn_id, next_hreq->base.msg_id);
+        _write_pending_request(next_hreq);
+    }
+
+    return need_close;
 }
 
 
@@ -773,8 +796,8 @@ static int _server_rx_response_cb(h1_codec_request_state_t *hrs,
     assert(hreq && hreq == (_server_request_t*) DEQ_HEAD(hconn->requests));
 
     qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
-           "[C%"PRIu64"][L%"PRIu64"] HTTP response received: status=%d phrase=%s version=%"PRIi32".%"PRIi32,
-           hconn->conn_id, hconn->in_link_id, status_code, reason_phrase ? reason_phrase : "<NONE>",
+           "[C%"PRIu64"][L%"PRIu64"] HTTP msg_id=%"PRIu64" response received: status=%d phrase=%s version=%"PRIi32".%"PRIi32,
+           hconn->conn_id, hconn->in_link_id, hreq->base.msg_id, status_code, reason_phrase ? reason_phrase : "<NONE>",
            version_major, version_minor);
 
     _server_response_msg_t *rmsg = new__server_response_msg_t();
@@ -799,6 +822,7 @@ static int _server_rx_response_cb(h1_codec_request_state_t *hrs,
         }
     }
 
+    hreq->response_complete = false;
     return 0;
 }
 
@@ -885,8 +909,8 @@ static int _server_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_bo
         hconn->in_link_credit -= 1;
 
         qd_log(hconn->adaptor->log, QD_LOG_TRACE,
-               "[C%"PRIu64"][L%"PRIu64"] Delivering response to router addr=%s",
-               hconn->conn_id, hconn->in_link_id, hreq->base.response_addr);
+               "[C%"PRIu64"][L%"PRIu64"] Delivering msg-id=%"PRIu64" response to router addr=%s",
+               hconn->conn_id, hconn->in_link_id, hreq->base.msg_id, hreq->base.response_addr);
 
         qd_iterator_t *addr = qd_message_field_iterator(rmsg->msg, QD_FIELD_TO);
         assert(addr);
@@ -941,9 +965,10 @@ static void _server_rx_done_cb(h1_codec_request_state_t *hrs)
     qd_message_t *msg = rmsg->msg ? rmsg->msg : qdr_delivery_message(rmsg->dlv);
 
     qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
-           "[C%"PRIu64"][L%"PRIu64"] HTTP response receive complete.",
-           hconn->conn_id, hconn->in_link_id);
+           "[C%"PRIu64"][L%"PRIu64"] HTTP response message msg-id=%"PRIu64" decoding complete.",
+           hconn->conn_id, hconn->in_link_id, hreq->base.msg_id);
 
+    hreq->response_complete = true;
     rmsg->rx_complete = true;
 
     if (!qd_message_receive_complete(msg)) {
@@ -1108,7 +1133,6 @@ static _server_request_t *_create_request_context(qdr_http1_connection_t *hconn,
     reply_to = (char*) qd_iterator_copy(reply_to_itr);
     qd_iterator_free(reply_to_itr);
 
-    assert(reply_to && strlen(reply_to));  // remove me
     if (!reply_to) {
         qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
                "[C%"PRIu64"][L%"PRIu64"] Rejecting message no reply-to.",
@@ -1132,7 +1156,7 @@ static _server_request_t *_create_request_context(qdr_http1_connection_t *hconn,
     DEQ_INSERT_TAIL(hconn->requests, &hreq->base);
 
     qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
-           "[C%"PRIu64"][L%"PRIu64"] New HTTP Request msg_id=%"PRIu64" reply-to=%s.",
+           "[C%"PRIu64"][L%"PRIu64"] New HTTP Request msg-id=%"PRIu64" reply-to=%s.",
            hconn->conn_id, hconn->out_link_id, msg_id, reply_to);
     return hreq;
 }
@@ -1271,12 +1295,13 @@ static uint64_t _encode_request_message(_server_request_t *hreq)
 
     if (!hreq->headers_encoded) {
         uint64_t outcome = _send_request_headers(hreq, msg);
+        hreq->headers_encoded = true;
         if (outcome) {
             qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
-                   "[C%"PRIu64"][L%"PRIu64"] Rejecting malformed message.", hconn->conn_id, hconn->out_link_id);
+                   "[C%"PRIu64"][L%"PRIu64"] Rejecting malformed message msg-id=%"PRIu64,
+                   hconn->conn_id, hconn->out_link_id, hreq->base.msg_id);
             return outcome;
         }
-        hreq->headers_encoded = true;
     }
 
     qd_message_stream_data_t *stream_data = 0;
@@ -1299,17 +1324,15 @@ static uint64_t _encode_request_message(_server_request_t *hreq)
         }
 
         case QD_MESSAGE_STREAM_DATA_FOOTER_OK:
+            qd_message_stream_data_release(stream_data);
             break;
 
         case QD_MESSAGE_STREAM_DATA_NO_MORE:
             // indicate this message is complete
-            qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
-                   "[C%"PRIu64"][L%"PRIu64"] request message encoding completed",
-                   hconn->conn_id, hconn->out_link_id);
             return PN_ACCEPTED;
 
         case QD_MESSAGE_STREAM_DATA_INCOMPLETE:
-            qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+            qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
                    "[C%"PRIu64"][L%"PRIu64"] body data need more",
                    hconn->conn_id, hconn->out_link_id);
             return 0;  // wait for more
@@ -1334,9 +1357,11 @@ uint64_t qdr_http1_server_core_link_deliver(qdr_http1_adaptor_t    *adaptor,
                                             qdr_delivery_t         *delivery,
                                             bool                    settled)
 {
-    qd_message_t         *msg = qdr_delivery_message(delivery);
-    _server_request_t   *hreq = (_server_request_t*) qdr_delivery_get_context(delivery);
+    qd_message_t *msg = qdr_delivery_message(delivery);
+    if (qd_message_is_discard(msg))
+        return 0;
 
+    _server_request_t *hreq = (_server_request_t*) qdr_delivery_get_context(delivery);
     if (!hreq) {
         // new delivery - create new request:
         switch (qd_message_check_depth(msg, QD_DEPTH_PROPERTIES)) {
@@ -1348,6 +1373,7 @@ uint64_t qdr_http1_server_core_link_deliver(qdr_http1_adaptor_t    *adaptor,
                    "[C%"PRIu64"][L%"PRIu64"] Malformed HTTP/1.x message",
                    hconn->conn_id, link->identity);
             qd_message_set_send_complete(msg);
+            qd_message_set_discard(msg, true);
             qdr_link_flow(qdr_http1_adaptor->core, link, 1, false);
             return PN_REJECTED;
 
@@ -1357,6 +1383,7 @@ uint64_t qdr_http1_server_core_link_deliver(qdr_http1_adaptor_t    *adaptor,
                 qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
                        "[C%"PRIu64"][L%"PRIu64"] Discarding malformed message.", hconn->conn_id, link->identity);
                 qd_message_set_send_complete(msg);
+                qd_message_set_discard(msg, true);
                 qdr_link_flow(qdr_http1_adaptor->core, link, 1, false);
                 return PN_REJECTED;
             }
@@ -1368,21 +1395,28 @@ uint64_t qdr_http1_server_core_link_deliver(qdr_http1_adaptor_t    *adaptor,
         }
     }
 
-    if (!hreq->request_dispo)
+    if (!hreq->request_dispo) {
         hreq->request_dispo = _encode_request_message(hreq);
-
-    if (hreq->request_dispo && qd_message_receive_complete(msg)) {
-
-        qd_message_set_send_complete(msg);
-        qdr_link_flow(qdr_http1_adaptor->core, link, 1, false);
-
-        if (hreq->request_dispo == PN_ACCEPTED) {
-            hreq->request_encoded = true;
-            h1_codec_tx_done(hreq->base.lib_rs, &hreq->close_on_complete);
-
-        } else {
-            // mapping to HTTP request failed:
-            _cancel_request(hreq);
+        if (hreq->request_dispo) {
+            qd_message_set_send_complete(msg);
+            if (hreq->request_dispo == PN_ACCEPTED) {
+                h1_codec_tx_done(hreq->base.lib_rs, &hreq->close_on_complete);
+                qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+                       "[C%"PRIu64"][L%"PRIu64"] HTTP request message msg-id=%"PRIu64" encoding complete",
+                       hconn->conn_id, link->identity, hreq->base.msg_id);
+            } else {
+                // message invalid
+                qd_message_set_discard(msg, true);
+                _cancel_request(hreq);
+
+                // returning a terminal disposition will cause the delivery to be updated and settled,
+                // so drop our reference
+                qdr_delivery_set_context(hreq->request_dlv, 0);
+                qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "malformed HTTP1 request, delivery released");
+                hreq->request_dlv = 0;
+                hreq->request_acked = hreq->request_settled = true;
+                return hreq->request_dispo;
+            }
         }
     }
 
@@ -1435,12 +1469,13 @@ static void _server_request_free(_server_request_t *hreq)
 
 static void _write_pending_request(_server_request_t *hreq)
 {
-    if (hreq && !hreq->cancelled && !hreq->base.hconn->close_connection) {
+    if (hreq && !hreq->cancelled) {
         assert(DEQ_PREV(&hreq->base) == 0);  // preserve order!
         uint64_t written = qdr_http1_write_out_data(hreq->base.hconn, &hreq->out_data);
         hreq->base.out_http1_octets += written;
-        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %"PRIu64" octets written",
-               hreq->base.hconn->conn_id, written);
+        if (written)
+            qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %"PRIu64" octets written",
+                   hreq->base.hconn->conn_id, written);
     }
 }
 
@@ -1457,6 +1492,11 @@ void qdr_http1_server_conn_cleanup(qdr_http1_connection_t *hconn)
 
 static void _cancel_request(_server_request_t *hreq)
 {
+    qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+           "[C%"PRIu64"][L%"PRIu64"] Cancelling HTTP Request msg-id=%"PRIu64,
+           hreq->base.hconn->conn_id, hreq->base.hconn->out_link_id,
+           hreq->base.msg_id);
+
     if (!hreq->base.lib_rs) {
         // never even got to encoding it - manually mark it cancelled
         hreq->cancelled = true;
diff --git a/tests/system_tests_http1_adaptor.py b/tests/system_tests_http1_adaptor.py
index 4d1d4f7..237adca 100644
--- a/tests/system_tests_http1_adaptor.py
+++ b/tests/system_tests_http1_adaptor.py
@@ -40,10 +40,11 @@ except ImportError:
     from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
     from httplib import HTTPConnection, HTTPException
 
+from proton import Message
 from proton.handlers import MessagingHandler
 from proton.reactor import Container
 from system_test import TestCase, unittest, main_module, Qdrouterd, QdManager
-from system_test import TIMEOUT, Logger
+from system_test import TIMEOUT, Logger, AsyncTestSender, AsyncTestReceiver
 
 
 class RequestMsg(object):
@@ -304,7 +305,12 @@ class ThreadedTestClient(object):
                                                                     op,
                                                                     req.target)})
                     self._logger.log("TestClient getting %s response" % op)
-                    rsp = client.getresponse()
+                    try:
+                        rsp = client.getresponse()
+                    except HTTPException as exc:
+                        self._logger.log("TestClient response failed: %s" % exc)
+                        self.error = str(exc)
+                        return
                     self._logger.log("TestClient response %s received" % op)
                     if val:
                         try:
@@ -1472,10 +1478,11 @@ class Http1AdaptorBadEndpointsTest(TestCase):
 
         cls.http_server_port = cls.tester.get_port()
         cls.http_listener_port = cls.tester.get_port()
+        cls.http_fake_port = cls.tester.get_port()
 
         config = [
             ('router', {'mode': 'standalone',
-                        'id': 'TestBadEnpoints',
+                        'id': 'TestBadEndpoints',
                         'allowUnsettledMulticast': 'yes'}),
             ('listener', {'role': 'normal',
                           'port': cls.tester.get_port()}),
@@ -1485,6 +1492,9 @@ class Http1AdaptorBadEndpointsTest(TestCase):
             ('httpListener', {'port': cls.http_listener_port,
                               'protocolVersion': 'HTTP1',
                               'address': 'testServer'}),
+            ('httpListener', {'port': cls.http_fake_port,
+                              'protocolVersion': 'HTTP1',
+                              'address': 'fakeServer'}),
             ('address', {'prefix': 'closest',   'distribution': 'closest'}),
             ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
         ]
@@ -1522,6 +1532,161 @@ class Http1AdaptorBadEndpointsTest(TestCase):
         self.assertIsNone(error)
         self.assertEqual(1, count)
 
+    def test_02_bad_request_message(self):
+        """
+        Test various improperly constructed request messages
+        """
+        server = TestServer(server_port=self.http_server_port,
+                            client_port=self.http_listener_port,
+                            tests={})
+
+        body_filler = "?" * 1024 * 300  # Q2
+
+        msg = Message(body="NOMSGID " + body_filler)
+        ts = AsyncTestSender(address=self.INT_A.listener,
+                             target="testServer",
+                             message=msg)
+        ts.wait()
+        self.assertEqual(1, ts.rejected);
+
+        msg = Message(body="NO REPLY TO " + body_filler)
+        msg.id = 1
+        ts = AsyncTestSender(address=self.INT_A.listener,
+                             target="testServer",
+                             message=msg)
+        ts.wait()
+        self.assertEqual(1, ts.rejected);
+
+        msg = Message(body="NO SUBJECT " + body_filler)
+        msg.id = 1
+        msg.reply_to = "amqp://fake/reply_to"
+        ts = AsyncTestSender(address=self.INT_A.listener,
+                             target="testServer",
+                             message=msg)
+        ts.wait()
+        self.assertEqual(1, ts.rejected);
+
+        msg = Message(body="NO APP PROPERTIES " + body_filler)
+        msg.id = 1
+        msg.reply_to = "amqp://fake/reply_to"
+        msg.subject = "GET"
+        ts = AsyncTestSender(address=self.INT_A.listener,
+                             target="testServer",
+                             message=msg)
+        ts.wait()
+        self.assertEqual(1, ts.rejected);
+
+        # TODO: fix body parsing (returns NEED_MORE)
+        # msg = Message(body="INVALID BODY " + body_filler)
+        # msg.id = 1
+        # msg.reply_to = "amqp://fake/reply_to"
+        # msg.subject = "GET"
+        # msg.properties = {"http:target": "/Some/target"}
+        # ts = AsyncTestSender(address=self.INT_A.listener,
+        #                      target="testServer",
+        #                      message=msg)
+        # ts.wait()
+        # self.assertEqual(1, ts.rejected);
+
+        server.wait()
+
+        # verify router is still sane:
+        count, error = http1_ping(self.http_server_port,
+                                  self.http_listener_port)
+        self.assertIsNone(error)
+        self.assertEqual(1, count)
+
+    def test_03_bad_response_message(self):
+        """
+        Test various improperly constructed response messages
+        """
+        DUMMY_TESTS = {
+            "GET": [
+                (RequestMsg("GET", "/GET/test_03_bad_response_message",
+                            headers={"Content-Length": "000"}),
+                 None,
+                 None,
+                ),
+            ]
+        }
+
+        body_filler = "?" * 1024 * 300  # Q2
+
+        # fake server
+        rx = AsyncTestReceiver(self.INT_A.listener,
+                               source="fakeServer")
+
+        # no correlation id:
+        client = ThreadedTestClient(DUMMY_TESTS,
+                                    self.http_fake_port)
+        req = rx.queue.get(timeout=TIMEOUT)
+        resp = Message(body="NO CORRELATION ID " + body_filler)
+        resp.to = req.reply_to
+        ts = AsyncTestSender(address=self.INT_A.listener,
+                             target=req.reply_to,
+                             message=resp)
+        ts.wait()
+        self.assertEqual(1, ts.rejected);
+        client.wait()
+        self.assertIsNotNone(client.error)
+
+        # missing application properties
+        client = ThreadedTestClient(DUMMY_TESTS,
+                                    self.http_fake_port)
+        req = rx.queue.get(timeout=TIMEOUT)
+
+        resp = Message(body="NO APPLICATION PROPS " + body_filler)
+        resp.to = req.reply_to
+        resp.correlation_id = req.id
+        ts = AsyncTestSender(address=self.INT_A.listener,
+                             target=req.reply_to,
+                             message=resp)
+        ts.wait()
+        self.assertEqual(1, ts.rejected);
+        client.wait()
+        self.assertIsNotNone(client.error)
+
+        # no status application property
+        client = ThreadedTestClient(DUMMY_TESTS,
+                                    self.http_fake_port)
+        req = rx.queue.get(timeout=TIMEOUT)
+        resp = Message(body="MISSING STATUS HEADER " + body_filler)
+        resp.to = req.reply_to
+        resp.correlation_id = req.id
+        resp.properties = {"stuff": "value"}
+        ts = AsyncTestSender(address=self.INT_A.listener,
+                             target=req.reply_to,
+                             message=resp)
+        ts.wait()
+        self.assertEqual(1, ts.rejected);
+        client.wait()
+        self.assertIsNotNone(client.error)
+
+        # TODO: fix body parsing (returns NEED_MORE)
+        # # invalid body format
+        # client = ThreadedTestClient(DUMMY_TESTS,
+        #                             self.http_fake_port)
+        # req = rx.queue.get(timeout=TIMEOUT)
+        # resp = Message(body="INVALID BODY FORMAT " + body_filler)
+        # resp.to = req.reply_to
+        # resp.correlation_id = req.id
+        # resp.properties = {"http:status": 200}
+        # ts = AsyncTestSender(address=self.INT_A.listener,
+        #                      target=req.reply_to,
+        #                      message=resp)
+        # ts.wait()
+        # self.assertEqual(1, ts.rejected);
+        # client.wait()
+        # self.assertIsNotNone(client.error)
+
+        rx.stop()
+
+        # verify router is still sane:
+        count, error = http1_ping(self.http_server_port,
+                                  self.http_listener_port)
+        self.assertIsNone(error)
+        self.assertEqual(1, count)
+
 
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message