qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject [qpid-dispatch] branch master updated: DISPATCH-1226: fix race during link detach handling
Date Wed, 19 Dec 2018 14:18:17 GMT
This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new b952edc  DISPATCH-1226: fix race during link detach handling
b952edc is described below

commit b952edc093113f9b9ecada9df46322d09fdd0e87
Author: Kenneth Giusti <kgiusti@apache.org>
AuthorDate: Tue Dec 18 09:52:47 2018 -0500

    DISPATCH-1226: fix race during link detach handling
    
    This closes #427
---
 src/router_core/connections.c         | 70 +++++++++++++++++++++--------------
 src/router_core/router_core_private.h |  3 +-
 2 files changed, 44 insertions(+), 29 deletions(-)

diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index a1717a8..54a27be 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -32,7 +32,8 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action,
boo
 static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *action, bool
discard);
 static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *action, bool
discard);
 static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdr_link_delete_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_link_detach_sent_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_link_detach_sent(qdr_link_t *link);
 
 ALLOC_DEFINE(qdr_connection_t);
 ALLOC_DEFINE(qdr_connection_work_t);
@@ -217,7 +218,7 @@ int qdr_connection_process(qdr_connection_t *conn)
 
     qdr_link_ref_t *ref;
     qdr_link_t     *link;
-    bool            free_link;
+    bool            detach_sent;
 
     int event_count = 0;
 
@@ -264,7 +265,7 @@ int qdr_connection_process(qdr_connection_t *conn)
     for (int priority = QDR_MAX_PRIORITY; priority >= 0; -- priority) {
         do {
             qdr_link_work_t *link_work;
-            free_link = false;
+            detach_sent = false;
 
             ref = DEQ_HEAD(links_with_work[priority]);
             if (ref) {
@@ -325,12 +326,11 @@ int qdr_connection_process(qdr_connection_t *conn)
                         break;
 
                     case QDR_LINK_WORK_FIRST_DETACH :
-                        core->detach_handler(core->user_context, link, link_work->error,
true, link_work->close_link);
-                        break;
-
                     case QDR_LINK_WORK_SECOND_DETACH :
-                        core->detach_handler(core->user_context, link, link_work->error,
false, link_work->close_link);
-                        free_link = true;
+                        core->detach_handler(core->user_context, link, link_work->error,
+                                             link_work->work_type == QDR_LINK_WORK_FIRST_DETACH,
+                                             link_work->close_link);
+                        detach_sent = true;
                         break;
                     }
 
@@ -352,10 +352,12 @@ int qdr_connection_process(qdr_connection_t *conn)
                     event_count++;
                 }
 
-                if (free_link)
-                    qdr_link_delete(link);
+                if (detach_sent) {
+                    // let the core thread know so it can clean up
+                    qdr_link_detach_sent(link);
+                }
             }
-        } while (free_link || link);
+        } while (detach_sent || link);
     }
 
     return event_count;
@@ -528,9 +530,11 @@ void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t
*error)
 }
 
 
-void qdr_link_delete(qdr_link_t *link)
+/* let the core thread know that a dispatch has been sent by the I/O thread
+ */
+static void qdr_link_detach_sent(qdr_link_t *link)
 {
-    qdr_action_t *action = qdr_action(qdr_link_delete_CT, "link_delete");
+    qdr_action_t *action = qdr_action(qdr_link_detach_sent_CT, "link_detach_sent");
 
     action->args.connection.link = link;
     qdr_action_enqueue(link->core, action);
@@ -911,9 +915,6 @@ void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t
     qdr_link_work_t *work = new_qdr_link_work_t();
     ZERO(work);
     work->work_type  = ++link->detach_count == 1 ? QDR_LINK_WORK_FIRST_DETACH : QDR_LINK_WORK_SECOND_DETACH;
-    if (work->work_type == QDR_LINK_WORK_SECOND_DETACH) {
-        link->detach_received = true;
-    }
     work->close_link = close;
 
     if (error)
@@ -1538,10 +1539,11 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t
*action, b
     qd_detach_type_t  dt    = action->args.connection.dt;
     qdr_address_t    *addr  = link->owning_addr;
 
-    //
-    // Bump the detach count to track half and full detaches
-    //
-    link->detach_count++;
+    if (link->detach_received)
+        return;
+
+    link->detach_received = true;
+    ++link->detach_count;
 
     if (link->core_endpoint) {
         qdrc_endpoint_do_detach_CT(core, link->core_endpoint, error);
@@ -1570,7 +1572,7 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t
*action, b
             //
             // If the link is completely detached, release its resources
             //
-            if (link->detach_count == 2) {
+            if (link->detach_send_done) {
                 qdr_link_cleanup_CT(core, conn, link);
                 free_qdr_link_t(link);
             }
@@ -1657,13 +1659,19 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t
*action, b
         // Handle the disposition of any deliveries that remain on the link
         //
         qdr_link_cleanup_deliveries_CT(core, conn, link);
-        
+
         //
         // If the detach occurred via protocol, send a detach back.
         //
-        if (dt != QD_LOST)
+        if (dt != QD_LOST) {
             qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NONE, dt == QD_CLOSED);
-    } else {
+        } else {
+            // no detach can be sent out because the connection was lost
+            qdr_link_cleanup_CT(core, conn, link);
+            free_qdr_link_t(link);
+        }
+    } else if (link->detach_send_done) {  // detach count indicates detach has been scheduled
+        // I/O thread is finished sending detach, ok to free link now
         qdr_link_cleanup_CT(core, conn, link);
         free_qdr_link_t(link);
     }
@@ -1680,16 +1688,22 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t
*action, b
 }
 
 
-static void qdr_link_delete_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+/* invoked on core thread to signal that the I/O thread has sent the detach
+ */
+static void qdr_link_detach_sent_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
     if (discard)
         return;
 
     qdr_link_t *link = action->args.connection.link;
 
-    if (link && link->conn) {
-        qdr_link_cleanup_CT(core, link->conn, link);
-        free_qdr_link_t(link);
+    if (link) {
+        link->detach_send_done = true;
+        if (link->conn && link->detach_received) {
+            // link is fully detached
+            qdr_link_cleanup_CT(core, link->conn, link);
+            free_qdr_link_t(link);
+        }
     }
 }
 
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 7d6f2a3..1cc21e3 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -441,7 +441,8 @@ struct qdr_link_t {
     bool                     strip_annotations_out;
     bool                     drain_mode;
     bool                     stalled_outbound;  ///< Indicates that this link is stalled
on outbound buffer backpressure
-    bool                     detach_received;
+    bool                     detach_received;   ///< True on core receipt of inbound attach
+    bool                     detach_send_done;  ///< True once the detach has been sent
by the I/O thread
     bool                     edge;              ///< True if this link is in an edge-connection
     char                    *strip_prefix;
     char                    *insert_prefix;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message