qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tr...@apache.org
Subject qpid-dispatch git commit: DISPATCH-179 - Delivery disposition and settlement now propagate across the core.
Date Wed, 27 Jan 2016 23:07:45 GMT
Repository: qpid-dispatch
Updated Branches:
  refs/heads/tross-DISPATCH-179-1 54c923d63 -> ccf4c7086


DISPATCH-179 - Delivery disposition and settlement now propagate across the core.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ccf4c708
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ccf4c708
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ccf4c708

Branch: refs/heads/tross-DISPATCH-179-1
Commit: ccf4c708668dc0aa8e3fc97f5f86115715a1ecb6
Parents: 54c923d
Author: Ted Ross <tross@redhat.com>
Authored: Wed Jan 27 18:06:59 2016 -0500
Committer: Ted Ross <tross@redhat.com>
Committed: Wed Jan 27 18:06:59 2016 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h |  1 +
 src/router_core/forwarder.c         | 13 +++--
 src/router_core/transfer.c          | 81 +++++++++++++++++++++++++++-----
 src/router_node.c                   | 16 +++++--
 4 files changed, 91 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ccf4c708/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 5f63b0d..c081b6e 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -527,6 +527,7 @@ void qdr_connection_handlers(qdr_core_t                *core,
  * Delivery functions
  ******************************************************************************
  */
+void qdr_delivery_free(qdr_delivery_t *delivery);
 void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t
disp);
 void qdr_delivery_settle(qdr_core_t *core, qdr_delivery_t *delivery);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ccf4c708/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 30a6d9c..90e210f 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -54,13 +54,18 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t
*pe
 
     ZERO(dlv);
     dlv->link    = link;
-    dlv->peer    = peer;
     dlv->msg     = qd_message_copy(msg);
     dlv->settled = !peer || peer->settled;
     dlv->tag     = core->next_tag++;
 
-    if (peer && peer->peer == 0)
-        peer->peer = dlv;  // TODO - make this a back-list for multicast tracking
+    //
+    // Create peer linkage only if the delivery is not settled
+    //
+    if (!dlv->settled) {
+        dlv->peer = peer;
+        if (peer && peer->peer == 0)
+            peer->peer = dlv;  // TODO - make this a back-list for multicast tracking
+    }
 
     return dlv;
 }
@@ -244,7 +249,7 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
             //
             // If the incoming delivery is not settled, it should be accepted and settled
here.
             //
-            if (in_delivery) {
+            if (in_delivery && !in_delivery->settled) {
                 in_delivery->disposition = PN_ACCEPTED;
                 in_delivery->settled     = true;
                 qdr_delivery_push_CT(core, in_delivery);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ccf4c708/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 5c44a7f..af88530 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -116,15 +116,19 @@ void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link,
int credit)
         dlv = DEQ_HEAD(link->undelivered);
         if (dlv) {
             DEQ_REMOVE_HEAD(link->undelivered);
-            DEQ_INSERT_TAIL(link->unsettled, dlv);
+            if (!dlv->settled)
+                DEQ_INSERT_TAIL(link->unsettled, dlv);
             credit--;
             offer = DEQ_SIZE(link->undelivered);
         } else
             drained = true;
         sys_mutex_unlock(conn->work_lock);
 
-        if (dlv)
+        if (dlv) {
             core->deliver_handler(core->user_context, link, dlv, dlv->settled);
+            if (dlv->settled)
+                qdr_delivery_free(dlv);
+        }
     }
 
     if (drained)
@@ -143,6 +147,8 @@ void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int
credit)
     qdr_delivery_ref_t *ref = DEQ_HEAD(updated_deliveries);
     while (ref) {
         core->delivery_update_handler(core->user_context, ref->dlv, ref->dlv->disposition,
ref->dlv->settled);
+        if (ref->dlv->settled)
+            qdr_delivery_free(ref->dlv);
         qdr_del_delivery_ref(&updated_deliveries, ref);
         ref = DEQ_HEAD(updated_deliveries);
     }
@@ -173,6 +179,14 @@ void qdr_send_to2(qdr_core_t *core, qd_message_t *msg, const char *addr,
bool ex
 }
 
 
+void qdr_delivery_free(qdr_delivery_t *delivery)
+{
+    if (delivery->msg)
+        qd_message_free(delivery->msg);
+    free_qdr_delivery_t(delivery);
+}
+
+
 void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t
disposition)
 {
     qdr_action_t *action = qdr_action(qdr_update_delivery_CT, "update_delivery");
@@ -259,7 +273,8 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action,
bool dis
     qdr_delivery_t *dlv          = action->args.connection.delivery;
     qd_bitmask_t   *link_exclude = action->args.connection.link_exclusion;
     qdr_link_t     *link         = dlv->link;
-    int             count        = 0;
+    int             fanout       = 0;
+    bool            presettled   = dlv->settled;
 
     //
     // NOTE: The link->undelivered list does not need to be protected by the
@@ -272,11 +287,11 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action,
bool dis
         if (!addr && dlv->to_addr)
             qd_hash_retrieve(core->addr_hash, dlv->to_addr, (void**) &addr);
         if (addr)
-            count = qdr_forward_message_CT(core, addr, dlv->msg, dlv, false,
-                                           link->link_type == QD_LINK_CONTROL, link_exclude);
+            fanout = qdr_forward_message_CT(core, addr, dlv->msg, dlv, false,
+                                            link->link_type == QD_LINK_CONTROL, link_exclude);
     }
 
-    if (count == 0) {
+    if (fanout == 0) {
         if (link->owning_addr) {
             //
             // Message was not delivered and the link is not anonymous.
@@ -288,20 +303,24 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action,
bool dis
             //
             // TODO - Release the delivery
             //
+            printf("TODO fanout == 0\n");
         }
-    } else if (count == 1) {
-        if (dlv->settled)
+    } else if (fanout == 1) {
+        if (presettled) {
             //
             // The delivery was pre-settled.  Issue replacement credit now that it's
             // been forwarded.
             //
             qdr_link_issue_credit_CT(core, link, 1);
-        else
+            assert(!dlv->peer);
+            qdr_delivery_free(dlv);
+        } else
             DEQ_INSERT_TAIL(link->unsettled, dlv);
     } else {
         //
-        // The count is greater than one.  Do something!  TODO
+        // The fanout is greater than one.  Do something!  TODO
         //
+        printf("TODO fanout > 1\n");
     }
 }
 
@@ -334,15 +353,55 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool
discard)
 static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
     qdr_delivery_t *dlv     = action->args.delivery.delivery;
+    qdr_delivery_t *peer    = dlv->peer;
+    bool            push    = false;
     uint64_t        disp    = action->args.delivery.disposition;
-    //    bool            settled = action->args.delivery.settled;
+    bool            settled = action->args.delivery.settled;
+
+    //
+    // Logic:
+    //
+    // If disposition has changed and there is a peer link, set the disposition of the peer
+    // If settled, the delivery must be unlinked and freed.
+    // If settled and there is a peer, the peer shall be settled and unlinked.  It shall
not
+    //   be freed until the connection-side thread settles the PN delivery.
+    //
 
     if (disp != dlv->disposition) {
         //
         // Disposition has changed, propagate the change to the peer delivery.
         //
         dlv->disposition = disp;
+        if (peer) {
+            peer->disposition = disp;
+            push = true;
+        }
+    }
+
+    if (settled) {
+        if (peer) {
+            peer->settled = true;
+            push = true;
+            peer->peer = 0;
+            dlv->peer  = 0;
+            if (peer->link) {
+                DEQ_REMOVE(peer->link->unsettled, peer);
+                if (peer->link->link_direction == QD_INCOMING)
+                    qdr_link_issue_credit_CT(core, peer->link, 1);
+            }
+        }
+
+        if (dlv->link) {
+            DEQ_REMOVE(dlv->link->unsettled, dlv);
+            if (dlv->link->link_direction == QD_INCOMING)
+                qdr_link_issue_credit_CT(core, dlv->link, 1);
+        }
+
+        qdr_delivery_free(dlv);
     }
+
+    if (push)
+        qdr_delivery_push_CT(core, peer);
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ccf4c708/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index de09c5b..b16b862 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -264,7 +264,7 @@ static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t
*pnd
     //
     qd_message_depth_t  validation_depth = anonymous_link ? QD_DEPTH_PROPERTIES : QD_DEPTH_MESSAGE_ANNOTATIONS;
     bool                valid_message    = qd_message_check(msg, validation_depth);
-    qdr_delivery_t     *delivery;
+    qdr_delivery_t     *delivery         = 0;
 
     if (valid_message) {
         qd_parsed_field_t   *in_ma        = qd_message_message_annotations(msg);
@@ -299,8 +299,12 @@ static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t
*pnd
             delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd),
link_exclusions);
 
         if (delivery) {
-            pn_delivery_set_context(pnd, delivery);
-            qdr_delivery_set_context(delivery, pnd);
+            if (pn_delivery_settled(pnd))
+                pn_delivery_settle(pnd);
+            else {
+                pn_delivery_set_context(pnd, delivery);
+                qdr_delivery_set_context(delivery, pnd);
+            }
         } else {
             //
             // The message is now and will always be unroutable because there is no address.
@@ -680,8 +684,10 @@ static void qd_router_link_deliver(void *context, qdr_link_t *link, qdr_delivery
     pn_delivery(plink, pn_dtag(tag, tag_length));
     pn_delivery_t *pdlv = pn_link_current(plink);
 
-    pn_delivery_set_context(pdlv, dlv);
-    qdr_delivery_set_context(dlv, pdlv);
+    if (!settled) {
+        pn_delivery_set_context(pdlv, dlv);
+        qdr_delivery_set_context(dlv, pdlv);
+    }
 
     qd_message_send(qdr_delivery_message(dlv), qlink, qdr_link_strip_annotations_out(link));
     if (settled)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message