qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject [qpid-dispatch] branch master updated: DISPATCH-1690: clean up last inbound message on link close
Date Mon, 29 Jun 2020 13:38:21 GMT
This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 40c1687  DISPATCH-1690: clean up last inbound message on link close
40c1687 is described below

commit 40c1687bedf839841259e054653629b2909293d9
Author: Kenneth Giusti <kgiusti@apache.org>
AuthorDate: Thu Jun 18 17:32:22 2020 -0400

    DISPATCH-1690: clean up last inbound message on link close
    
    If an incoming link fails/closes before the current inbound message
    has been forwarded to the core the current inbound message is not
    freed.  This patch adds a weak reference (safe pointer) in the link to
    the current inbound message.  This reference is cleared once the
    message has been delivered to the core.  Should the link close prior
    to delivery this weak reference is used to clean up the message.
    
    This closes #766
---
 include/qpid/dispatch/container.h |  2 ++
 src/container.c                   | 17 +++++++++++++++++
 src/message.c                     |  1 +
 src/router_node.c                 |  2 ++
 tests/lsan.supp                   |  2 +-
 5 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/include/qpid/dispatch/container.h b/include/qpid/dispatch/container.h
index 54f3d7c..6364ff0 100644
--- a/include/qpid/dispatch/container.h
+++ b/include/qpid/dispatch/container.h
@@ -234,6 +234,8 @@ void qd_link_q3_block(qd_link_t *link);
 void qd_link_q3_unblock(qd_link_t *link);
 uint64_t qd_link_link_id(const qd_link_t *link);
 void qd_link_set_link_id(qd_link_t *link, uint64_t link_id);
+struct qd_message_t;
+void qd_link_set_incoming_msg(qd_link_t *link, struct qd_message_t *msg);
 
 qd_session_t *qd_session(pn_session_t *pn_ssn);
 void qd_session_cleanup(qd_connection_t *qd_conn);
diff --git a/src/container.c b/src/container.c
index 01d878b..c535988 100644
--- a/src/container.c
+++ b/src/container.c
@@ -59,6 +59,7 @@ struct qd_link_t {
     qd_direction_t              direction;
     void                       *context;
     qd_node_t                  *node;
+    qd_alloc_safe_ptr_t         incoming_msg;  // DISPATCH-1690: for cleanup
     pn_snd_settle_mode_t        remote_snd_settle_mode;
     qd_link_ref_list_t          ref_list;
     bool                        q2_limit_unbounded;
@@ -348,6 +349,11 @@ static void cleanup_link(qd_link_t *link)
             link->pn_link = 0;
         }
         link->pn_sess = 0;
+
+        // cleanup any inbound message that has not been forwarded
+        qd_message_t *msg = link->incoming_msg.ptr;
+        if (msg && qd_alloc_sequence(msg) == link->incoming_msg.seq)
+            qd_message_free(msg);
     }
 }
 
@@ -1230,3 +1236,14 @@ void qd_session_cleanup(qd_connection_t *qd_conn)
         pn_ssn = pn_session_next(pn_ssn, 0);
     }
 }
+
+
+void qd_link_set_incoming_msg(qd_link_t *link, qd_message_t *msg)
+{
+    if (msg) {
+        link->incoming_msg.ptr = (void*) msg;
+        link->incoming_msg.seq = qd_alloc_sequence(msg);
+    } else {
+        qd_nullify_safe_ptr(&link->incoming_msg);
+    }
+}
diff --git a/src/message.c b/src/message.c
index 447691b..2ce6890 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1312,6 +1312,7 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
         pn_record_def(record, PN_DELIVERY_CTX, PN_WEAKREF);
         pn_record_set(record, PN_DELIVERY_CTX, (void*) msg);
         msg->content->max_message_size = qd_connection_max_message_size(qdc);
+        qd_link_set_incoming_msg(qdl, (qd_message_t*) msg);
     }
 
     //
diff --git a/src/router_node.c b/src/router_node.c
index 33809e7..cc04ea3 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -482,6 +482,7 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
                                                    dtag.size,
                                                    pn_disposition_type(pn_delivery_remote(pnd)),
                                                    pn_disposition_data(pn_delivery_remote(pnd)));
+        qd_link_set_incoming_msg(link, (qd_message_t*) 0);  // msg no longer exclusive to
qd_link
         qdr_node_connect_deliveries(link, delivery, pnd);
         qdr_delivery_decref(router->router_core, delivery, "release protection of return
from deliver_to_routed_link");
         return next_delivery;
@@ -714,6 +715,7 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
     //
 
     if (delivery) {
+        qd_link_set_incoming_msg(link, (qd_message_t*) 0);  // msg no longer exclusive to
qd_link
         qdr_node_connect_deliveries(link, delivery, pnd);
         qdr_delivery_decref(router->router_core, delivery, "release protection of return
from deliver");
     } else {
diff --git a/tests/lsan.supp b/tests/lsan.supp
index cbe34c9..b76496c 100644
--- a/tests/lsan.supp
+++ b/tests/lsan.supp
@@ -42,7 +42,7 @@ leak:qd_container_register_node_type
 leak:^qdr_send_to2$
 
 # DISPATCH-1661, DISPATCH-1662
-leak:^qdr_terminus$
+# leak:^qdr_terminus$
 
 # Ubuntu 18.04 (Bionic)
 leak:qdr_link_issue_credit_CT


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message