From dev-return-96747-archive-asf-public=cust-asf.ponee.io@qpid.apache.org Mon Aug 26 14:45:02 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 915C6180663 for ; Mon, 26 Aug 2019 16:45:02 +0200 (CEST) Received: (qmail 23213 invoked by uid 500); 26 Aug 2019 14:45:01 -0000 Mailing-List: contact dev-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list dev@qpid.apache.org Received: (qmail 23200 invoked by uid 99); 26 Aug 2019 14:45:01 -0000 Received: from mailrelay1-us-west.apache.org (HELO mailrelay1-us-west.apache.org) (209.188.14.139) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 26 Aug 2019 14:45:01 +0000 Received: from jira-he-de.apache.org (static.172.67.40.188.clients.your-server.de [188.40.67.172]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 2229CE3063 for ; Mon, 26 Aug 2019 14:45:01 +0000 (UTC) Received: from jira-he-de.apache.org (localhost.localdomain [127.0.0.1]) by jira-he-de.apache.org (ASF Mail Server at jira-he-de.apache.org) with ESMTP id 55D3C782234 for ; Mon, 26 Aug 2019 14:45:00 +0000 (UTC) Date: Mon, 26 Aug 2019 14:45:00 +0000 (UTC) From: "ASF GitHub Bot (Jira)" To: dev@qpid.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (DISPATCH-1266) Improve router's handling of unsettled multicast deliveries MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/DISPATCH-1266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16915859#comment-16915859 ] ASF GitHub Bot commented on DISPATCH-1266: ------------------------------------------ kgiusti commented on pull request #554: DISPATCH-1266: Fix unsettled multicast forwarding URL: https://github.com/apache/qpid-dispatch/pull/554#discussion_r317637876 ########## File path: src/router_core/delivery.c ########## @@ -558,81 +572,386 @@ void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *dlv, const char *l } +// the remote endpoint has change the state (disposition) or settlement for the +// delivery. Update the local state/settlement accordingly. +// static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { - qdr_delivery_t *dlv = action->args.delivery.delivery; - qdr_delivery_t *peer = qdr_delivery_first_peer_CT(dlv); - bool push = false; - bool peer_moved = false; - bool dlv_moved = false; - uint64_t disp = action->args.delivery.disposition; - bool settled = action->args.delivery.settled; - qdr_error_t *error = action->args.delivery.error; - bool error_unassigned = true; + if (discard) + return; + + qdr_delivery_t *dlv = action->args.delivery.delivery; + qdr_delivery_t *peer = qdr_delivery_first_peer_CT(dlv); + uint64_t new_disp = action->args.delivery.disposition; + bool settled = action->args.delivery.settled; + qdr_error_t *error = action->args.delivery.error; + bool free_error = true; + + if (dlv->multicast) { + // + // remote state change for *inbound* multicast delivery, + // update downstream *outbound* peers + // + qdr_delivery_mcast_update_CT(core, dlv, new_disp, settled); - qdr_link_t *dlv_link = qdr_delivery_link(dlv); - qdr_link_t *peer_link = qdr_delivery_link(peer); + } else if (peer && peer->multicast) { + // + // remote state change for an *outbound* delivery to a multicast address, + // propagate upstream to *inbound* delivery (peer) + // + // coverity[swapped_arguments] + qdr_delivery_mcast_peer_update_CT(core, peer, dlv, new_disp, settled); + + } else { + // + // Unicast forwarding + // + free_error = !qdr_delivery_ucast_update_CT(core, dlv, peer, new_disp, settled, error); + } // - // Logic: + // Release the action reference, possibly freeing the delivery // - // If disposition has changed and there is a peer link, set the disposition of the peer - // If settled, the delivery must be unlinked and freed. - // If settled and there is a peer, the peer shall be settled and unlinked. It shall not - // be freed until the connection-side thread settles the PN delivery. + qdr_delivery_decref_CT(core, dlv, "qdr_update_delivery_CT - remove from action"); + + if (free_error) + qdr_error_free(error); +} + + +// The remote delivery state (disposition) and/or remote settlement for an +// unicast delivery has changed. Propagate the changes to its peer delivery. +// +// returns true if ownership of error parameter is taken (do not free it) +// +static bool qdr_delivery_ucast_update_CT(qdr_core_t *core, qdr_delivery_t *dlv, + qdr_delivery_t *peer, uint64_t new_disp, bool settled, + qdr_error_t *error) +{ + bool push = false; + bool peer_moved = false; + bool dlv_moved = false; + bool error_assigned = false; + qdr_link_t *dlink = qdr_delivery_link(dlv); + + assert(!dlv->multicast); + assert(!peer || !peer->multicast); + + if (peer) + qdr_delivery_incref(peer, "qdr_delivery_ucast_update_CT - prevent peer from being freed"); + + // + // Non-multicast Logic: + // + // If disposition has changed and there is a peer link, set the disposition + // of the peer + // If remote settled, the delivery must be unlinked and freed. + // If remote settled and there is a peer, the peer shall be settled and + // unlinked. It shall not be freed until the connection-side thread + // settles the PN delivery. // - if (disp != dlv->disposition) { + if (new_disp != dlv->remote_disposition) { // - // Disposition has changed, propagate the change to the peer delivery. + // Remote disposition has changed, propagate the change to the peer + // delivery local disposition. // - dlv->disposition = disp; + dlv->remote_disposition = new_disp; if (peer) { - peer->disposition = disp; + peer->disposition = new_disp; peer->error = error; push = true; - error_unassigned = false; + error_assigned = true; qdr_delivery_copy_extension_state(dlv, peer, false); } } if (settled) { if (peer) { peer->settled = true; - if (peer_link) { + if (qdr_delivery_link(peer)) { peer_moved = qdr_delivery_settled_CT(core, peer); - if (peer_moved) - push = true; } qdr_delivery_unlink_peers_CT(core, dlv, peer); } - if (dlv_link) + if (dlink) dlv_moved = qdr_delivery_settled_CT(core, dlv); } // // If the delivery's link has a core endpoint, notify the endpoint of the update // - if (dlv_link && dlv_link->core_endpoint) - qdrc_endpoint_do_update_CT(core, dlv_link->core_endpoint, dlv, settled); + if (dlink && dlink->core_endpoint) + qdrc_endpoint_do_update_CT(core, dlink->core_endpoint, dlv, settled); - if (push) + if (push || peer_moved) qdr_delivery_push_CT(core, peer); // - // Release the action reference, possibly freeing the delivery - // - qdr_delivery_decref_CT(core, dlv, "qdr_update_delivery_CT - remove from action"); - - // - // Release the unsettled references if the deliveries were moved + // Release the unsettled references if the deliveries were moved/settled // if (dlv_moved) - qdr_delivery_decref_CT(core, dlv, "qdr_update_delivery_CT - removed from unsettled (1)"); + qdr_delivery_decref_CT(core, dlv, "qdr_delivery_ucast_update CT - dlv removed from unsettled"); if (peer_moved) - qdr_delivery_decref_CT(core, peer, "qdr_update_delivery_CT - removed from unsettled (2)"); - if (error_unassigned) - qdr_error_free(error); + qdr_delivery_decref_CT(core, peer, "qdr_delivery_ucast_update_CT - peer removed from unsettled"); + if (peer) + qdr_delivery_decref_CT(core, peer, "qdr_delivery_ucast_update_CT - allow free of peer"); + + return error_assigned; +} + + +// The remote delivery state (disposition) and/or remote settlement for an +// incoming multicast delivery has changed. Propagate the changes "downstream" +// to the outbound peers. Once all peers have settled then settle the in_dlv +// +void qdr_delivery_mcast_update_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, + uint64_t new_disp, bool settled) +{ + if (!in_dlv) + return; + + bool update_disp = new_disp && in_dlv->remote_disposition != new_disp; + + assert(in_dlv->multicast); // expect in_dlv to be the inbound delivery + + qd_log(core->log, QD_LOG_TRACE, + "Remote updated mcast delivery (%p) disp=0x%"PRIx64" settled=%s", + in_dlv, new_disp, (settled) ? "True" : "False"); + + if (update_disp) + in_dlv->remote_disposition = new_disp; + + qdr_delivery_t *out_peer = qdr_delivery_first_peer_CT(in_dlv); + while (out_peer) { + bool push = false; + bool moved = false; + bool unlink = false; + + // + // AMQP 1.0 allows the sender to specify a disposition + // so forward it along + // + if (update_disp && out_peer->disposition != new_disp) { + out_peer->disposition = new_disp; + push = true; + // extension state/error ignored, not sure how + // that can be supported for mcast... + } + + // + // the sender has settled + // + if (settled) { + out_peer->settled = true; + if (qdr_delivery_link(out_peer)) { + moved = qdr_delivery_settled_CT(core, out_peer); + } + unlink = true; + } + + if (push || moved) { + qdr_delivery_push_CT(core, out_peer); + } + + if (moved) { + qdr_delivery_decref_CT(core, out_peer, + "qdr_delivery_mcast_update_CT - removed out_peer from unsettled"); + } + + qd_log(core->log, QD_LOG_TRACE, + "Updating mcast delivery (%p) out peer (%p) updated disp=%s settled=%s", + in_dlv, out_peer, (push) ? "True" : "False", + (unlink) ? "True" : "False"); + + if (unlink) { + qdr_delivery_unlink_peers_CT(core, in_dlv, out_peer); // may free out_peer! + } + + out_peer = qdr_delivery_next_peer_CT(in_dlv); + } + + if (settled) { + assert(qdr_delivery_peer_count_CT(in_dlv) == 0); + in_dlv->settled = true; + if (qdr_delivery_settled_CT(core, in_dlv)) { + qdr_delivery_decref_CT(core, in_dlv, + "qdr_delivery_mcast_update CT - in_dlv removed from unsettled"); + } + } +} + + +// An outgoing peer delivery of an incoming multicast delivery has settled. +// Settle the inbound delivery after all of its outbound deliveries +// have been settled. +// +// Note: this call may free either in_dlv or out_peer by unlinking them. The +// caller must increment the reference count for these deliveries if they are +// to be referenced after this call. +// +// moved: set to true if in_dlv has been removed from the unsettled list +// return: true if in_dlv terminal state was set Review comment: has been settled ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org > Improve router's handling of unsettled multicast deliveries > ----------------------------------------------------------- > > Key: DISPATCH-1266 > URL: https://issues.apache.org/jira/browse/DISPATCH-1266 > Project: Qpid Dispatch > Issue Type: Improvement > Components: Documentation, Management Agent, Routing Engine, Tests > Affects Versions: 1.5.0 > Reporter: Ken Giusti > Assignee: Ken Giusti > Priority: Major > > The current implementation of multicast forwarding will immediately settle the delivery on ingress to the router and forward the delivery as pre-settled. > This means the settlement state sent back to the producer provides no indication of actual delivery state. And the credit is replenished immediately rather than gate on the settlement coming from the consumer. > Improve this behavior by settling of the inbound delivery after all outbound deliveries have settled. -- This message was sent by Atlassian Jira (v8.3.2#803003) --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org For additional commands, e-mail: dev-help@qpid.apache.org