qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (Jira)" <j...@apache.org>
Subject [jira] [Commented] (DISPATCH-1266) Improve router's handling of unsettled multicast deliveries
Date Mon, 26 Aug 2019 14:45:00 GMT

    [ https://issues.apache.org/jira/browse/DISPATCH-1266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16915859#comment-16915859
] 

ASF GitHub Bot commented on DISPATCH-1266:
------------------------------------------

kgiusti commented on pull request #554: DISPATCH-1266: Fix unsettled multicast forwarding
URL: https://github.com/apache/qpid-dispatch/pull/554#discussion_r317637876
 
 

 ##########
 File path: src/router_core/delivery.c
 ##########
 @@ -558,81 +572,386 @@ void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *dlv,
const char *l
 }
 
 
+// the remote endpoint has change the state (disposition) or settlement for the
+// delivery.  Update the local state/settlement accordingly.
+//
 static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
-    qdr_delivery_t *dlv        = action->args.delivery.delivery;
-    qdr_delivery_t *peer       = qdr_delivery_first_peer_CT(dlv);
-    bool            push       = false;
-    bool            peer_moved = false;
-    bool            dlv_moved  = false;
-    uint64_t        disp       = action->args.delivery.disposition;
-    bool            settled    = action->args.delivery.settled;
-    qdr_error_t    *error      = action->args.delivery.error;
-    bool error_unassigned      = true;
+    if (discard)
+        return;
+
+    qdr_delivery_t *dlv      = action->args.delivery.delivery;
+    qdr_delivery_t *peer     = qdr_delivery_first_peer_CT(dlv);
+    uint64_t        new_disp = action->args.delivery.disposition;
+    bool            settled  = action->args.delivery.settled;
+    qdr_error_t    *error    = action->args.delivery.error;
+    bool free_error          = true;
+
+    if (dlv->multicast) {
+        //
+        // remote state change for *inbound* multicast delivery,
+        // update downstream *outbound* peers
+        //
+        qdr_delivery_mcast_update_CT(core, dlv, new_disp, settled);
 
-    qdr_link_t *dlv_link  = qdr_delivery_link(dlv);
-    qdr_link_t *peer_link = qdr_delivery_link(peer);
+    } else if (peer && peer->multicast) {
+        //
+        // remote state change for an *outbound* delivery to a multicast address,
+        // propagate upstream to *inbound* delivery (peer)
+        //
+        // coverity[swapped_arguments]
+        qdr_delivery_mcast_peer_update_CT(core, peer, dlv, new_disp, settled);
+
+    } else {
+        //
+        // Unicast forwarding
+        //
+        free_error = !qdr_delivery_ucast_update_CT(core, dlv, peer, new_disp, settled, error);
+    }
 
     //
-    // Logic:
+    // Release the action reference, possibly freeing the delivery
     //
-    // If disposition has changed and there is a peer link, set the disposition of the peer
-    // If settled, the delivery must be unlinked and freed.
-    // If settled and there is a peer, the peer shall be settled and unlinked.  It shall
not
-    //   be freed until the connection-side thread settles the PN delivery.
+    qdr_delivery_decref_CT(core, dlv, "qdr_update_delivery_CT - remove from action");
+
+    if (free_error)
+        qdr_error_free(error);
+}
+
+
+// The remote delivery state (disposition) and/or remote settlement for an
+// unicast delivery has changed.  Propagate the changes to its peer delivery.
+//
+// returns true if ownership of error parameter is taken (do not free it)
+//
+static bool qdr_delivery_ucast_update_CT(qdr_core_t *core, qdr_delivery_t *dlv,
+                                         qdr_delivery_t *peer, uint64_t new_disp, bool settled,
+                                         qdr_error_t *error)
+{
+    bool push           = false;
+    bool peer_moved     = false;
+    bool dlv_moved      = false;
+    bool error_assigned = false;
+    qdr_link_t *dlink   = qdr_delivery_link(dlv);
+
+    assert(!dlv->multicast);
+    assert(!peer || !peer->multicast);
+
+    if (peer)
+        qdr_delivery_incref(peer, "qdr_delivery_ucast_update_CT - prevent peer from being
freed");
+
+    //
+    // Non-multicast Logic:
+    //
+    // If disposition has changed and there is a peer link, set the disposition
+    // of the peer
+    // If remote settled, the delivery must be unlinked and freed.
+    // If remote settled and there is a peer, the peer shall be settled and
+    // unlinked.  It shall not be freed until the connection-side thread
+    // settles the PN delivery.
     //
-    if (disp != dlv->disposition) {
+    if (new_disp != dlv->remote_disposition) {
         //
-        // Disposition has changed, propagate the change to the peer delivery.
+        // Remote disposition has changed, propagate the change to the peer
+        // delivery local disposition.
         //
-        dlv->disposition = disp;
+        dlv->remote_disposition = new_disp;
         if (peer) {
-            peer->disposition = disp;
+            peer->disposition = new_disp;
             peer->error       = error;
             push = true;
-            error_unassigned = false;
+            error_assigned = true;
             qdr_delivery_copy_extension_state(dlv, peer, false);
         }
     }
 
     if (settled) {
         if (peer) {
             peer->settled = true;
-            if (peer_link) {
+            if (qdr_delivery_link(peer)) {
                 peer_moved = qdr_delivery_settled_CT(core, peer);
-                if (peer_moved)
-                    push = true;
             }
             qdr_delivery_unlink_peers_CT(core, dlv, peer);
         }
 
-        if (dlv_link)
+        if (dlink)
             dlv_moved = qdr_delivery_settled_CT(core, dlv);
     }
 
     //
     // If the delivery's link has a core endpoint, notify the endpoint of the update
     //
-    if (dlv_link && dlv_link->core_endpoint)
-        qdrc_endpoint_do_update_CT(core, dlv_link->core_endpoint, dlv, settled);
+    if (dlink && dlink->core_endpoint)
+        qdrc_endpoint_do_update_CT(core, dlink->core_endpoint, dlv, settled);
 
-    if (push)
+    if (push || peer_moved)
         qdr_delivery_push_CT(core, peer);
 
     //
-    // Release the action reference, possibly freeing the delivery
-    //
-    qdr_delivery_decref_CT(core, dlv, "qdr_update_delivery_CT - remove from action");
-
-    //
-    // Release the unsettled references if the deliveries were moved
+    // Release the unsettled references if the deliveries were moved/settled
     //
     if (dlv_moved)
-        qdr_delivery_decref_CT(core, dlv, "qdr_update_delivery_CT - removed from unsettled
(1)");
+        qdr_delivery_decref_CT(core, dlv, "qdr_delivery_ucast_update CT - dlv removed from
unsettled");
     if (peer_moved)
-        qdr_delivery_decref_CT(core, peer, "qdr_update_delivery_CT - removed from unsettled
(2)");
-    if (error_unassigned)
-        qdr_error_free(error);
+        qdr_delivery_decref_CT(core, peer, "qdr_delivery_ucast_update_CT - peer removed from
unsettled");
+    if (peer)
+        qdr_delivery_decref_CT(core, peer, "qdr_delivery_ucast_update_CT - allow free of
peer");
+
+    return error_assigned;
+}
+
+
+// The remote delivery state (disposition) and/or remote settlement for an
+// incoming multicast delivery has changed.  Propagate the changes "downstream"
+// to the outbound peers.  Once all peers have settled then settle the in_dlv
+//
+void qdr_delivery_mcast_update_CT(qdr_core_t *core, qdr_delivery_t *in_dlv,
+                                  uint64_t new_disp, bool settled)
+{
+    if (!in_dlv)
+        return;
+
+    bool update_disp = new_disp && in_dlv->remote_disposition != new_disp;
+
+    assert(in_dlv->multicast);  // expect in_dlv to be the inbound delivery
+
+    qd_log(core->log, QD_LOG_TRACE,
+           "Remote updated mcast delivery (%p) disp=0x%"PRIx64" settled=%s",
+           in_dlv, new_disp, (settled) ? "True" : "False");
+
+    if (update_disp)
+        in_dlv->remote_disposition = new_disp;
+
+    qdr_delivery_t *out_peer = qdr_delivery_first_peer_CT(in_dlv);
+    while (out_peer) {
+        bool push  = false;
+        bool moved  = false;
+        bool unlink = false;
+
+        //
+        // AMQP 1.0 allows the sender to specify a disposition
+        // so forward it along
+        //
+        if (update_disp && out_peer->disposition != new_disp) {
+            out_peer->disposition = new_disp;
+            push = true;
+            // extension state/error ignored, not sure how
+            // that can be supported for mcast...
+        }
+
+        //
+        // the sender has settled
+        //
+        if (settled) {
+            out_peer->settled = true;
+            if (qdr_delivery_link(out_peer)) {
+                moved = qdr_delivery_settled_CT(core, out_peer);
+            }
+            unlink = true;
+        }
+
+        if (push || moved) {
+            qdr_delivery_push_CT(core, out_peer);
+        }
+
+        if (moved) {
+            qdr_delivery_decref_CT(core, out_peer,
+                                   "qdr_delivery_mcast_update_CT - removed out_peer from
unsettled");
+        }
+
+        qd_log(core->log, QD_LOG_TRACE,
+               "Updating mcast delivery (%p) out peer (%p) updated disp=%s settled=%s",
+               in_dlv, out_peer, (push) ? "True" : "False",
+               (unlink) ? "True" : "False");
+
+        if (unlink) {
+            qdr_delivery_unlink_peers_CT(core, in_dlv, out_peer);  // may free out_peer!
+        }
+
+        out_peer = qdr_delivery_next_peer_CT(in_dlv);
+    }
+
+    if (settled) {
+        assert(qdr_delivery_peer_count_CT(in_dlv) == 0);
+        in_dlv->settled = true;
+        if (qdr_delivery_settled_CT(core, in_dlv)) {
+            qdr_delivery_decref_CT(core, in_dlv,
+                                   "qdr_delivery_mcast_update CT - in_dlv removed from unsettled");
+        }
+    }
+}
+
+
+// An outgoing peer delivery of an incoming multicast delivery has settled.
+// Settle the inbound delivery after all of its outbound deliveries
+// have been settled.
+//
+// Note: this call may free either in_dlv or out_peer by unlinking them. The
+// caller must increment the reference count for these deliveries if they are
+// to be referenced after this call.
+//
+// moved: set to true if in_dlv has been removed from the unsettled list
+// return: true if in_dlv terminal state was set
 
 Review comment:
   has been settled
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Improve router's handling of unsettled multicast deliveries
> -----------------------------------------------------------
>
>                 Key: DISPATCH-1266
>                 URL: https://issues.apache.org/jira/browse/DISPATCH-1266
>             Project: Qpid Dispatch
>          Issue Type: Improvement
>          Components: Documentation, Management Agent, Routing Engine, Tests
>    Affects Versions: 1.5.0
>            Reporter: Ken Giusti
>            Assignee: Ken Giusti
>            Priority: Major
>
> The current implementation of multicast forwarding will immediately settle the delivery
on ingress to the router and forward the delivery as pre-settled.
> This means the settlement state sent back to the producer provides no indication of actual
delivery state.  And the credit is replenished immediately rather than gate on the settlement
coming from the consumer.  
> Improve this behavior by settling of the inbound delivery after all outbound deliveries
have settled. 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


Mime
View raw message