qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1673546 - in /qpid/dispatch/trunk: include/qpid/dispatch/ src/
Date Tue, 14 Apr 2015 20:03:33 GMT
Author: kgiusti
Date: Tue Apr 14 20:03:33 2015
New Revision: 1673546

URL: http://svn.apache.org/r1673546
Log:
DISPATCH-135: move qd_delivery_t to qd_router_delivery_t

Added:
    qpid/dispatch/trunk/src/router_delivery.c   (with props)
Modified:
    qpid/dispatch/trunk/include/qpid/dispatch/container.h
    qpid/dispatch/trunk/include/qpid/dispatch/message.h
    qpid/dispatch/trunk/include/qpid/dispatch/router.h
    qpid/dispatch/trunk/src/CMakeLists.txt
    qpid/dispatch/trunk/src/container.c
    qpid/dispatch/trunk/src/message.c
    qpid/dispatch/trunk/src/router_forwarders.c
    qpid/dispatch/trunk/src/router_node.c
    qpid/dispatch/trunk/src/router_private.h
    qpid/dispatch/trunk/src/waypoint.c

Modified: qpid/dispatch/trunk/include/qpid/dispatch/container.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/container.h?rev=1673546&r1=1673545&r2=1673546&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/container.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/container.h Tue Apr 14 20:03:33 2015
@@ -62,9 +62,8 @@ typedef enum {
 
 typedef struct qd_node_t     qd_node_t;
 typedef struct qd_link_t     qd_link_t;
-typedef struct qd_delivery_t qd_delivery_t;
 
-typedef void (*qd_container_delivery_handler_t)    (void *node_context, qd_link_t *link,
qd_delivery_t *delivery);
+typedef void (*qd_container_delivery_handler_t)    (void *node_context, qd_link_t *link,
pn_delivery_t *delivery);
 typedef int  (*qd_container_link_handler_t)        (void *node_context, qd_link_t *link);
 typedef int  (*qd_container_link_detach_handler_t) (void *node_context, qd_link_t *link,
int closed);
 typedef void (*qd_container_node_handler_t)        (void *type_context, qd_node_t *node);
@@ -174,30 +173,6 @@ pn_terminus_t *qd_link_remote_target(qd_
 void qd_link_activate(qd_link_t *link);
 void qd_link_close(qd_link_t *link);
 bool qd_link_drain_changed(qd_link_t *link, bool *mode);
-
-/**
- * Important: qd_delivery must never be called twice in a row without an intervening pn_link_advance.
- *            The Disatch architecture provides a hook for discovering when an outgoing link
is writable
- *            and has credit.  When a link is writable, a delivery is allocated, written,
and advanced
- *            in one operation.  If a backlog of pending deliveries is created, an assertion
will be
- *            thrown.
- */
-qd_delivery_t *qd_delivery(qd_link_t *link, pn_delivery_tag_t tag);
-void qd_delivery_set_undeliverable_LH(qd_delivery_t *delivery);
-void qd_delivery_free_LH(qd_delivery_t *delivery, uint64_t final_disposition);
-void qd_delivery_link_peers_LH(qd_delivery_t *left, qd_delivery_t *right);
-void qd_delivery_unlink_LH(qd_delivery_t *delivery);
-void qd_delivery_fifo_enter_LH(qd_delivery_t *delivery);
-bool qd_delivery_fifo_exit_LH(qd_delivery_t *delivery);
-qd_delivery_t *qd_delivery_peer(qd_delivery_t *delivery);
-void qd_delivery_set_context(qd_delivery_t *delivery, void *context);
-void *qd_delivery_context(qd_delivery_t *delivery);
-pn_delivery_t *qd_delivery_pn(qd_delivery_t *delivery);
-void qd_delivery_settle(qd_delivery_t *delivery);
-bool qd_delivery_settled(qd_delivery_t *delivery);
-bool qd_delivery_disp_changed(qd_delivery_t *delivery);
-uint64_t qd_delivery_disp(qd_delivery_t *delivery);
-qd_link_t *qd_delivery_link(qd_delivery_t *delivery);
 void qd_link_free_LH(qd_link_t *link);
 
 ///@}

Modified: qpid/dispatch/trunk/include/qpid/dispatch/message.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/message.h?rev=1673546&r1=1673545&r2=1673546&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/message.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/message.h Tue Apr 14 20:03:33 2015
@@ -199,7 +199,7 @@ void qd_message_set_ingress_annotation(q
  * @param delivery An incoming delivery from a link
  * @return A pointer to the complete message or 0 if the message is not yet complete.
  */
-qd_message_t *qd_message_receive(qd_delivery_t *delivery);
+qd_message_t *qd_message_receive(pn_delivery_t *delivery);
 
 /**
  * Send the message outbound on an outgoing link.

Modified: qpid/dispatch/trunk/include/qpid/dispatch/router.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/router.h?rev=1673546&r1=1673545&r2=1673546&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/router.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/router.h Tue Apr 14 20:03:33 2015
@@ -35,6 +35,7 @@
 typedef struct qd_router_t  qd_router_t;
 typedef struct qd_address_t qd_address_t;
 typedef uint8_t             qd_address_semantics_t;
+typedef struct qd_router_delivery_t qd_router_delivery_t;
 
 /**
  * @name Address fanout semantics
@@ -122,7 +123,7 @@ struct qd_router_forwarder_t {
     bool (*forward)(qd_router_forwarder_t *forwarder,
                     qd_router_t *router,
                     qd_message_t *msg,
-                    qd_delivery_t *delivery,
+                    qd_router_delivery_t *delivery,
                     qd_address_t *addr,
                     qd_field_iterator_t *ingress_iterator,
                     bool is_direct);

Modified: qpid/dispatch/trunk/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/CMakeLists.txt?rev=1673546&r1=1673545&r2=1673546&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/CMakeLists.txt (original)
+++ qpid/dispatch/trunk/src/CMakeLists.txt Tue Apr 14 20:03:33 2015
@@ -65,6 +65,7 @@ set(qpid_dispatch_SOURCES
   python_embedded.c
   router_agent.c
   router_config.c
+  router_delivery.c
   router_node.c
   router_forwarders.c
   router_pynode.c

Modified: qpid/dispatch/trunk/src/container.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/container.c?rev=1673546&r1=1673545&r2=1673546&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/container.c (original)
+++ qpid/dispatch/trunk/src/container.c Tue Apr 14 20:03:33 2015
@@ -49,21 +49,6 @@ DEQ_DECLARE(qd_node_t, qd_node_list_t);
 ALLOC_DECLARE(qd_node_t);
 ALLOC_DEFINE(qd_node_t);
 
-/** Encapsulates a proton message delivery */
-struct qd_delivery_t {
-    DEQ_LINKS(qd_delivery_t);
-    pn_delivery_t *pn_delivery;
-    qd_delivery_t *peer;
-    void          *context;
-    uint64_t       disposition;
-    qd_link_t     *link;
-    int            in_fifo;
-    bool           pending_delete;
-};
-ALLOC_DECLARE(qd_delivery_t);
-ALLOC_DEFINE(qd_delivery_t);
-DEQ_DECLARE(qd_delivery_t, qd_delivery_list_t);
-
 
 /** Encapsulates a proton link for sending and receiving messages */
 struct qd_link_t {
@@ -72,7 +57,6 @@ struct qd_link_t {
     void               *context;
     qd_node_t          *node;
     bool               drain_mode;
-    qd_delivery_list_t deliveries;
 };
 
 ALLOC_DECLARE(qd_link_t);
@@ -136,7 +120,6 @@ static void setup_outgoing_link(qd_conta
     link->context    = 0;
     link->node       = node;
     link->drain_mode = pn_link_get_drain(pn_link);
-    DEQ_INIT(link->deliveries);
 
     pn_link_set_context(pn_link, link);
     node->ntype->outgoing_handler(node->context, link);
@@ -182,7 +165,6 @@ static void setup_incoming_link(qd_conta
     link->context    = 0;
     link->node       = node;
     link->drain_mode = pn_link_get_drain(pn_link);
-    DEQ_INIT(link->deliveries);
 
     pn_link_set_context(pn_link, link);
     node->ntype->incoming_handler(node->context, link);
@@ -217,26 +199,11 @@ static void do_receive(pn_delivery_t *pn
 {
     pn_link_t     *pn_link  = pn_delivery_link(pnd);
     qd_link_t     *link     = (qd_link_t*) pn_link_get_context(pn_link);
-    qd_delivery_t *delivery = (qd_delivery_t*) pn_delivery_get_context(pnd);
 
     if (link) {
         qd_node_t *node = link->node;
         if (node) {
-            if (!delivery) {
-                delivery = new_qd_delivery_t();
-                DEQ_ITEM_INIT(delivery);
-                delivery->pn_delivery    = pnd;
-                delivery->peer           = 0;
-                delivery->context        = 0;
-                delivery->disposition    = 0;
-                delivery->link           = link;
-                delivery->in_fifo        = 0;
-                delivery->pending_delete = false;
-                DEQ_INSERT_TAIL(link->deliveries, delivery);
-                pn_delivery_set_context(pnd, delivery);
-            }
-
-            node->ntype->rx_handler(node->context, link, delivery);
+            node->ntype->rx_handler(node->context, link, pnd);
             return;
         }
     }
@@ -248,7 +215,6 @@ static void do_receive(pn_delivery_t *pn
     pn_link_flow(pn_link, 1);
     pn_delivery_update(pnd, PN_REJECTED);
     pn_delivery_settle(pnd);
-    if (delivery) delivery->pn_delivery = 0;
 }
 
 
@@ -256,12 +222,11 @@ static void do_updated(pn_delivery_t *pn
 {
     pn_link_t     *pn_link  = pn_delivery_link(pnd);
     qd_link_t     *link     = (qd_link_t*) pn_link_get_context(pn_link);
-    qd_delivery_t *delivery = (qd_delivery_t*) pn_delivery_get_context(pnd);
 
-    if (link && delivery) {
+    if (link) {
         qd_node_t *node = link->node;
         if (node)
-            node->ntype->disp_handler(node->context, link, delivery);
+            node->ntype->disp_handler(node->context, link, pnd);
     }
 }
 
@@ -682,7 +647,6 @@ qd_link_t *qd_link(qd_node_t *node, qd_c
     link->context    = node->context;
     link->node       = node;
     link->drain_mode = pn_link_get_drain(link->pn_link);
-    DEQ_INIT(link->deliveries);
 
     pn_link_set_context(link->pn_link, link);
 
@@ -695,11 +659,6 @@ qd_link_t *qd_link(qd_node_t *node, qd_c
 void qd_link_free_LH(qd_link_t *link)
 {
     if (!link) return;
-    qd_delivery_t *d = DEQ_HEAD(link->deliveries);
-    while (d) {
-        qd_delivery_free_LH(d, 0);  // removes itself from list
-        d = DEQ_HEAD(link->deliveries);
-    }
     free_qd_link_t(link);
 }
 
@@ -843,165 +802,3 @@ bool qd_link_drain_changed(qd_link_t *li
 }
 
 
-qd_delivery_t *qd_delivery(qd_link_t *link, pn_delivery_tag_t tag)
-{
-    pn_link_t *pnl = qd_link_pn(link);
-
-    //
-    // If there is a current delivery on this outgoing link, something
-    // is wrong with the delivey algorithm.  We assume that the current
-    // delivery ('pnd' below) is the one created by pn_delivery.  If it is
-    // not, then my understanding of how proton works is incorrect.
-    //
-    assert(!pn_link_current(pnl));
-
-    pn_delivery(pnl, tag);
-    pn_delivery_t *pnd = pn_link_current(pnl);
-
-    if (!pnd)
-        return 0;
-
-    qd_delivery_t *delivery = new_qd_delivery_t();
-    DEQ_ITEM_INIT(delivery);
-    delivery->pn_delivery    = pnd;
-    delivery->peer           = 0;
-    delivery->context        = 0;
-    delivery->disposition    = 0;
-    delivery->link           = link;
-    delivery->in_fifo        = 0;
-    delivery->pending_delete = false;
-    DEQ_INSERT_TAIL(link->deliveries, delivery);
-    pn_delivery_set_context(pnd, delivery);
-
-    return delivery;
-}
-
-// mark the delivery as 'undeliverable-here' so peers won't re-forward it to
-// us.
-void qd_delivery_set_undeliverable_LH(qd_delivery_t *delivery)
-{
-    if (delivery->pn_delivery) {
-        pn_disposition_t *dp = pn_delivery_local(delivery->pn_delivery);
-        if (dp) {
-            pn_disposition_set_undeliverable(dp, true);
-        }
-    }
-}
-
-void qd_delivery_free_LH(qd_delivery_t *delivery, uint64_t final_disposition)
-{
-    if (delivery->pn_delivery) {
-        if (final_disposition > 0)
-            pn_delivery_update(delivery->pn_delivery, final_disposition);
-        pn_delivery_set_context(delivery->pn_delivery, 0);
-        pn_delivery_settle(delivery->pn_delivery);
-        delivery->pn_delivery = 0;
-    }
-
-    //assert(!delivery->peer);
-    if (delivery->peer)
-        qd_delivery_unlink_LH(delivery);
-
-    if (delivery->link) {
-        DEQ_REMOVE(delivery->link->deliveries, delivery);
-        delivery->link = 0;
-    }
-    if (delivery->in_fifo)
-        delivery->pending_delete = true;
-    else {
-        free_qd_delivery_t(delivery);
-    }
-}
-
-
-void qd_delivery_link_peers_LH(qd_delivery_t *right, qd_delivery_t *left)
-{
-    right->peer = left;
-    left->peer  = right;
-}
-
-
-void qd_delivery_unlink_LH(qd_delivery_t *delivery)
-{
-    if (delivery->peer) {
-        delivery->peer->peer = 0;
-        delivery->peer       = 0;
-    }
-}
-
-
-void qd_delivery_fifo_enter_LH(qd_delivery_t *delivery)
-{
-    delivery->in_fifo++;
-}
-
-
-bool qd_delivery_fifo_exit_LH(qd_delivery_t *delivery)
-{
-    delivery->in_fifo--;
-    if (delivery->in_fifo == 0 && delivery->pending_delete) {
-        free_qd_delivery_t(delivery);
-        return false;
-    }
-
-    return true;
-}
-
-
-void qd_delivery_set_context(qd_delivery_t *delivery, void *context)
-{
-    delivery->context = context;
-}
-
-
-void *qd_delivery_context(qd_delivery_t *delivery)
-{
-    return delivery->context;
-}
-
-
-qd_delivery_t *qd_delivery_peer(qd_delivery_t *delivery)
-{
-    return delivery->peer;
-}
-
-
-pn_delivery_t *qd_delivery_pn(qd_delivery_t *delivery)
-{
-    return delivery->pn_delivery;
-}
-
-
-void qd_delivery_settle(qd_delivery_t *delivery)
-{
-    if (delivery->pn_delivery) {
-        pn_delivery_set_context(delivery->pn_delivery, 0);
-        pn_delivery_settle(delivery->pn_delivery);
-        delivery->pn_delivery = 0;
-    }
-}
-
-
-bool qd_delivery_settled(qd_delivery_t *delivery)
-{
-    return pn_delivery_settled(delivery->pn_delivery);
-}
-
-
-bool qd_delivery_disp_changed(qd_delivery_t *delivery)
-{
-    return delivery->disposition != pn_delivery_remote_state(delivery->pn_delivery);
-}
-
-
-uint64_t qd_delivery_disp(qd_delivery_t *delivery)
-{
-    delivery->disposition = pn_delivery_remote_state(delivery->pn_delivery);
-    return delivery->disposition;
-}
-
-
-qd_link_t *qd_delivery_link(qd_delivery_t *delivery)
-{
-    return delivery->link;
-}

Modified: qpid/dispatch/trunk/src/message.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/message.c?rev=1673546&r1=1673545&r2=1673546&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/message.c (original)
+++ qpid/dispatch/trunk/src/message.c Tue Apr 14 20:03:33 2015
@@ -666,13 +666,12 @@ void qd_message_set_ingress_annotation(q
     qd_compose_free(ingress_field);
 }
 
-qd_message_t *qd_message_receive(qd_delivery_t *delivery)
+qd_message_t *qd_message_receive(pn_delivery_t *delivery)
 {
-    pn_delivery_t    *pnd  = qd_delivery_pn(delivery);
-    qd_message_pvt_t *msg  = (qd_message_pvt_t*) qd_delivery_context(delivery);
-    pn_link_t        *link = pn_delivery_link(pnd);
+    pn_link_t        *link = pn_delivery_link(delivery);
     ssize_t           rc;
     qd_buffer_t      *buf;
+    qd_message_pvt_t *msg  = (qd_message_pvt_t*) pn_delivery_get_context(delivery);
 
     //
     // If there is no message associated with the delivery, this is the first time
@@ -681,7 +680,7 @@ qd_message_t *qd_message_receive(qd_deli
     //
     if (!msg) {
         msg = (qd_message_pvt_t*) qd_message();
-        qd_delivery_set_context(delivery, (void*) msg);
+        pn_delivery_set_context(delivery, (void*) msg);
     }
 
     //
@@ -714,7 +713,7 @@ qd_message_t *qd_message_receive(qd_deli
                 DEQ_REMOVE_TAIL(msg->content->buffers);
                 qd_buffer_free(buf);
             }
-            qd_delivery_set_context(delivery, 0);
+            pn_delivery_set_context(delivery, 0);
 
             char repr[qd_message_repr_len()];
             qd_log(log_source, QD_LOG_TRACE, "Received %s on link %s",

Added: qpid/dispatch/trunk/src/router_delivery.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_delivery.c?rev=1673546&view=auto
==============================================================================
--- qpid/dispatch/trunk/src/router_delivery.c (added)
+++ qpid/dispatch/trunk/src/router_delivery.c Tue Apr 14 20:03:33 2015
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/** Encapsulates a proton message delivery */
+
+#include <qpid/dispatch.h>
+#include "dispatch_private.h"
+#include "router_private.h"
+
+struct qd_router_delivery_t {
+    DEQ_LINKS(qd_router_delivery_t);
+    pn_delivery_t        *pn_delivery;
+    qd_router_delivery_t *peer;
+    void                 *context;
+    uint64_t              disposition;
+    qd_router_link_t     *link;
+    int                   in_fifo;
+    bool                  pending_delete;
+};
+ALLOC_DECLARE(qd_router_delivery_t);
+ALLOC_DEFINE(qd_router_delivery_t);
+
+// create a router delivery from a proton delivery received on rlink
+qd_router_delivery_t *qd_router_delivery(qd_router_link_t *rlink, pn_delivery_t *pnd)
+{
+    assert(pn_delivery_get_context(pnd) == 0);
+    qd_router_delivery_t *delivery = new_qd_router_delivery_t();
+    if (delivery) {
+        DEQ_ITEM_INIT(delivery);
+        delivery->pn_delivery    = pnd;
+        delivery->peer           = 0;
+        delivery->context        = 0;
+        delivery->disposition    = 0;
+        delivery->link           = rlink;
+        delivery->in_fifo        = 0;
+        delivery->pending_delete = false;
+        DEQ_INSERT_TAIL(rlink->deliveries, delivery);
+        pn_delivery_set_context(pnd, delivery);
+    }
+
+    return delivery;
+}
+
+
+// generate a new router delivery for rlink
+qd_router_delivery_t *qd_router_link_new_delivery(qd_router_link_t *rlink, pn_delivery_tag_t
tag)
+{
+    qd_link_t *link = rlink->link;
+    pn_link_t *pnl = qd_link_pn(link);
+
+    //
+    // If there is a current delivery on this outgoing link, something
+    // is wrong with the delivey algorithm.  We assume that the current
+    // delivery ('pnd' below) is the one created by pn_delivery.  If it is
+    // not, then my understanding of how proton works is incorrect.
+    //
+    assert(!pn_link_current(pnl));
+
+    pn_delivery(pnl, tag);
+    pn_delivery_t *pnd = pn_link_current(pnl);
+
+    if (!pnd)
+        return 0;
+
+    return qd_router_delivery(rlink, pnd);
+}
+
+// mark the delivery as 'undeliverable-here' so peers won't re-forward it to
+// us.
+void qd_router_delivery_set_undeliverable_LH(qd_router_delivery_t *delivery)
+{
+    if (delivery->pn_delivery) {
+        pn_disposition_t *dp = pn_delivery_local(delivery->pn_delivery);
+        if (dp) {
+            pn_disposition_set_undeliverable(dp, true);
+        }
+    }
+}
+
+void qd_router_delivery_free_LH(qd_router_delivery_t *delivery, uint64_t final_disposition)
+{
+    if (delivery->pn_delivery) {
+        if (final_disposition > 0)
+            pn_delivery_update(delivery->pn_delivery, final_disposition);
+        pn_delivery_set_context(delivery->pn_delivery, 0);
+        pn_delivery_settle(delivery->pn_delivery);
+        delivery->pn_delivery = 0;
+    }
+
+    if (delivery->peer)
+        qd_router_delivery_unlink_LH(delivery);
+
+    if (delivery->link) {
+        DEQ_REMOVE(delivery->link->deliveries, delivery);
+        delivery->link = 0;
+    }
+    if (delivery->in_fifo)
+        delivery->pending_delete = true;
+    else {
+        free_qd_router_delivery_t(delivery);
+    }
+}
+
+
+void qd_router_delivery_link_peers_LH(qd_router_delivery_t *right, qd_router_delivery_t *left)
+{
+    right->peer = left;
+    left->peer  = right;
+}
+
+
+void qd_router_delivery_unlink_LH(qd_router_delivery_t *delivery)
+{
+    if (delivery->peer) {
+        delivery->peer->peer = 0;
+        delivery->peer       = 0;
+    }
+}
+
+
+void qd_router_delivery_fifo_enter_LH(qd_router_delivery_t *delivery)
+{
+    delivery->in_fifo++;
+}
+
+
+bool qd_router_delivery_fifo_exit_LH(qd_router_delivery_t *delivery)
+{
+    delivery->in_fifo--;
+    if (delivery->in_fifo == 0 && delivery->pending_delete) {
+        free_qd_router_delivery_t(delivery);
+        return false;
+    }
+
+    return true;
+}
+
+
+void qd_router_delivery_set_context(qd_router_delivery_t *delivery, void *context)
+{
+    delivery->context = context;
+}
+
+
+void *qd_router_delivery_context(qd_router_delivery_t *delivery)
+{
+    return delivery->context;
+}
+
+
+qd_router_delivery_t *qd_router_delivery_peer(qd_router_delivery_t *delivery)
+{
+    return delivery->peer;
+}
+
+
+pn_delivery_t *qd_router_delivery_pn(qd_router_delivery_t *delivery)
+{
+    return delivery->pn_delivery;
+}
+
+
+void qd_router_delivery_settle(qd_router_delivery_t *delivery)
+{
+    if (delivery->pn_delivery) {
+        pn_delivery_set_context(delivery->pn_delivery, 0);
+        pn_delivery_settle(delivery->pn_delivery);
+        delivery->pn_delivery = 0;
+    }
+}
+
+
+bool qd_router_delivery_settled(qd_router_delivery_t *delivery)
+{
+    return pn_delivery_settled(delivery->pn_delivery);
+}
+
+
+bool qd_router_delivery_disp_changed(qd_router_delivery_t *delivery)
+{
+    return delivery->disposition != pn_delivery_remote_state(delivery->pn_delivery);
+}
+
+
+uint64_t qd_router_delivery_disp(qd_router_delivery_t *delivery)
+{
+    delivery->disposition = pn_delivery_remote_state(delivery->pn_delivery);
+    return delivery->disposition;
+}
+
+
+qd_router_link_t *qd_router_delivery_link(qd_router_delivery_t *delivery)
+{
+    return delivery->link;
+}

Propchange: qpid/dispatch/trunk/src/router_delivery.c
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/dispatch/trunk/src/router_forwarders.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_forwarders.c?rev=1673546&r1=1673545&r2=1673546&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_forwarders.c (original)
+++ qpid/dispatch/trunk/src/router_forwarders.c Tue Apr 14 20:03:33 2015
@@ -25,7 +25,7 @@
 
 
 static void forward_to_direct_subscribers_LH(qd_address_t *addr,
-                                             qd_delivery_t *delivery,
+                                             qd_router_delivery_t *delivery,
                                              qd_message_t *msg,
                                              int *fanout)
 {
@@ -42,7 +42,7 @@ static void forward_to_direct_subscriber
         (*fanout)++;
         if (*fanout == 1) {
             re->delivery = delivery;
-            qd_delivery_fifo_enter_LH(delivery);
+            qd_router_delivery_fifo_enter_LH(delivery);
         }
 
         addr->deliveries_egress++;
@@ -72,7 +72,7 @@ static void forward_to_direct_subscriber
 
 static void forward_to_remote_subscribers_LH(qd_router_t *router,
                                              qd_address_t *addr,
-                                             qd_delivery_t *delivery,
+                                             qd_router_delivery_t *delivery,
                                              qd_message_t *msg,
                                              int *fanout,
                                              qd_field_iterator_t *ingress_iter)
@@ -139,7 +139,7 @@ static void forward_to_remote_subscriber
                 (*fanout)++;
                 if (*fanout == 1) {
                     re->delivery = delivery;
-                    qd_delivery_fifo_enter_LH(delivery);
+                    qd_router_delivery_fifo_enter_LH(delivery);
                 }
 
                 addr->deliveries_transit++;
@@ -157,7 +157,7 @@ static void forward_to_remote_subscriber
 static bool forwarder_multicast_LH(qd_router_forwarder_t *forwarder,
                                    qd_router_t *router,
                                    qd_message_t *msg,
-                                   qd_delivery_t *delivery,
+                                   qd_router_delivery_t *delivery,
                                    qd_address_t *addr,
                                    qd_field_iterator_t *ingress_iter,
                                    bool is_direct)
@@ -184,7 +184,7 @@ static bool forwarder_multicast_LH(qd_ro
 static bool forwarder_anycast_closest_LH(qd_router_forwarder_t *forwarder,
                                          qd_router_t *router,
                                          qd_message_t *msg,
-                                         qd_delivery_t *delivery,
+                                         qd_router_delivery_t *delivery,
                                          qd_address_t *addr,
                                          qd_field_iterator_t *ingress_iter,
                                          bool is_direct)
@@ -207,7 +207,7 @@ static bool forwarder_anycast_closest_LH
 static bool forwarder_anycast_balanced_LH(qd_router_forwarder_t *forwarder,
                                           qd_router_t *router,
                                           qd_message_t *msg,
-                                          qd_delivery_t *delivery,
+                                          qd_router_delivery_t *delivery,
                                           qd_address_t *addr,
                                           qd_field_iterator_t *ingress_iter,
                                           bool is_direct)

Modified: qpid/dispatch/trunk/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1673546&r1=1673545&r2=1673546&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Tue Apr 14 20:03:33 2015
@@ -400,9 +400,8 @@ void qd_router_link_free_LH(qd_router_li
     re = DEQ_HEAD(rlink->event_fifo);
     while (re) {
         DEQ_REMOVE_HEAD(rlink->event_fifo);
-        if (re->delivery && qd_delivery_fifo_exit_LH(re->delivery)) {
-            qd_delivery_unlink_LH(re->delivery);
-            qd_delivery_free_LH(re->delivery, re->disposition);
+        if (re->delivery && qd_router_delivery_fifo_exit_LH(re->delivery))
{
+            qd_router_delivery_unlink_LH(re->delivery);
         }
         free_qd_routed_event_t(re);
         re = DEQ_HEAD(rlink->event_fifo);
@@ -412,14 +411,19 @@ void qd_router_link_free_LH(qd_router_li
     while (re) {
         DEQ_REMOVE_HEAD(rlink->msg_fifo);
         if (re->delivery)
-            qd_delivery_fifo_exit_LH(re->delivery);
-        // we can't delete this delivery (it belongs to the receive link)
+            qd_router_delivery_fifo_exit_LH(re->delivery);
         if (re->message)
             qd_message_free(re->message);
         free_qd_routed_event_t(re);
         re = DEQ_HEAD(rlink->msg_fifo);
     }
 
+    qd_router_delivery_t *delivery = DEQ_HEAD(rlink->deliveries);
+    while (delivery) {
+        // this unlinks the delivery from the rlink:
+        qd_router_delivery_free_LH(delivery, PN_RELEASED);
+        delivery = DEQ_HEAD(rlink->deliveries);
+    }
     free_qd_router_link_t(rlink);
 }
 
@@ -430,7 +434,7 @@ void qd_router_link_free_LH(qd_router_li
 static int router_writable_link_handler(void* context, qd_link_t *link)
 {
     qd_router_t            *router = (qd_router_t*) context;
-    qd_delivery_t          *delivery;
+    qd_router_delivery_t          *delivery;
     qd_router_link_t       *rlink = (qd_router_link_t*) qd_link_get_context(link);
     pn_link_t              *pn_link = qd_link_pn(link);
     uint64_t                tag;
@@ -490,7 +494,7 @@ static int router_writable_link_handler(
         // Get a delivery for the send.  This will be the current delivery on the link.
         //
         tag++;
-        delivery = qd_delivery(link, pn_dtag((char*) &tag, 8));
+        delivery = qd_router_link_new_delivery(rlink, pn_dtag((char*) &tag, 8));
 
         //
         // Send the message
@@ -506,24 +510,23 @@ static int router_writable_link_handler(
         //
         sys_mutex_lock(router->lock);
         if (re->delivery) {
-            if (qd_delivery_fifo_exit_LH(re->delivery)) {
-                if (qd_delivery_settled(re->delivery)) {
-                    qd_link_t         *peer_link  = qd_delivery_link(re->delivery);
-                    qd_router_link_t  *peer_rlink = (qd_router_link_t*) qd_link_get_context(peer_link);
+            if (qd_router_delivery_fifo_exit_LH(re->delivery)) {
+                if (qd_router_delivery_settled(re->delivery)) {
+                    qd_router_link_t  *peer_rlink = qd_router_delivery_link(re->delivery);
                     qd_routed_event_t *return_re  = new_qd_routed_event_t();
                     DEQ_ITEM_INIT(return_re);
                     return_re->delivery    = re->delivery;
                     return_re->message     = 0;
                     return_re->settle      = true;
                     return_re->disposition = 0;
-                    qd_delivery_fifo_enter_LH(re->delivery);
+                    qd_router_delivery_fifo_enter_LH(re->delivery);
                     DEQ_INSERT_TAIL(peer_rlink->event_fifo, return_re);
-                    qd_link_activate(peer_link);
+                    qd_link_activate(peer_rlink->link);
                 } else
-                    qd_delivery_link_peers_LH(re->delivery, delivery);
+                    qd_router_delivery_link_peers_LH(re->delivery, delivery);
             }
         } else
-            qd_delivery_free_LH(delivery, 0);  // settle and free
+            qd_router_delivery_free_LH(delivery, 0);  // settle and free
         sys_mutex_unlock(router->lock);
 
         pn_link_advance(pn_link);
@@ -543,16 +546,16 @@ static int router_writable_link_handler(
 
         if (re->delivery) {
             if (re->disposition) {
-                pn_delivery_update(qd_delivery_pn(re->delivery), re->disposition);
+                pn_delivery_update(qd_router_delivery_pn(re->delivery), re->disposition);
                 event_count++;
             }
 
             sys_mutex_lock(router->lock);
 
-            bool ok = qd_delivery_fifo_exit_LH(re->delivery);
+            bool ok = qd_router_delivery_fifo_exit_LH(re->delivery);
             if (ok && re->settle) {
-                qd_delivery_unlink_LH(re->delivery);
-                qd_delivery_free_LH(re->delivery, re->disposition);
+                qd_router_delivery_unlink_LH(re->delivery);
+                qd_router_delivery_free_LH(re->delivery, re->disposition);
                 event_count++;
             }
 
@@ -692,7 +695,7 @@ static qd_field_iterator_t *router_annot
  * Note also that this function does not perform any message validation.  For link-routing,
  * there is no need to look into the transferred message.
  */
-static void router_link_route_delivery_LH(qd_router_link_t *peer_link, qd_delivery_t *delivery,
qd_message_t *msg)
+static void router_link_route_delivery_LH(qd_router_link_t *peer_link, qd_router_delivery_t
*delivery, qd_message_t *msg)
 {
     qd_routed_event_t *re = new_qd_routed_event_t();
 
@@ -707,7 +710,7 @@ static void router_link_route_delivery_L
     // Link the incoming delivery into the event for deferred processing
     //
     re->delivery = delivery;
-    qd_delivery_fifo_enter_LH(delivery);
+    qd_router_delivery_fifo_enter_LH(delivery);
 
     qd_link_activate(peer_link->link);
 }
@@ -716,7 +719,7 @@ static void router_link_route_delivery_L
 /**
  * Inbound Delivery Handler
  */
-static void router_rx_handler(void* context, qd_link_t *link, qd_delivery_t *delivery)
+static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
 {
     qd_router_t      *router  = (qd_router_t*) context;
     pn_link_t        *pn_link = qd_link_pn(link);
@@ -732,7 +735,7 @@ static void router_rx_handler(void* cont
     //        no reason to wait for the whole message to be received before starting to
     //        send it.
     //
-    msg = qd_message_receive(delivery);
+    msg = qd_message_receive(pnd);
     if (!msg)
         return;
 
@@ -747,7 +750,7 @@ static void router_rx_handler(void* cont
     sys_mutex_lock(router->lock);
     qd_router_link_t *clink = rlink->connected_link;
     if (clink) {
-        router_link_route_delivery_LH(clink, delivery, msg);
+        router_link_route_delivery_LH(clink, qd_router_delivery(rlink, pnd), msg);
         sys_mutex_unlock(router->lock);
         return;
     }
@@ -772,6 +775,7 @@ static void router_rx_handler(void* cont
         bool                 free_iter = true;
         char                *to_override  = 0;
         bool                 forwarded = false;
+        qd_router_delivery_t *delivery = qd_router_delivery(rlink, pnd);
 
         //
         // Only respect the delivery annotations if the message came from another router.
@@ -879,18 +883,19 @@ static void router_rx_handler(void* cont
         if (!forwarded) {
             if (on_message)
                 // our local in-process handler will accept it:
-                qd_delivery_free_LH(delivery, PN_ACCEPTED);
+                qd_router_delivery_free_LH(delivery, PN_ACCEPTED);
             else {
                 // no one has accepted it, so inform sender
-                qd_delivery_set_undeliverable_LH(delivery);
-                qd_delivery_free_LH(delivery, PN_MODIFIED);
+                qd_router_delivery_set_undeliverable_LH(delivery);
+                qd_router_delivery_free_LH(delivery, PN_MODIFIED);
             }
         }
     } else {
         //
         // Message is invalid.  Reject the message.
         //
-        qd_delivery_free_LH(delivery, PN_REJECTED);
+        pn_delivery_update(pnd, PN_REJECTED);
+        pn_delivery_settle(pnd);
     }
 
     sys_mutex_unlock(router->lock);
@@ -909,22 +914,24 @@ static void router_rx_handler(void* cont
 /**
  * Delivery Disposition Handler
  */
-static void router_disposition_handler(void* context, qd_link_t *link, qd_delivery_t *delivery)
+static void router_disposition_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
 {
     qd_router_t   *router  = (qd_router_t*) context;
-    bool           changed = qd_delivery_disp_changed(delivery);
-    uint64_t       disp    = qd_delivery_disp(delivery);
-    bool           settled = qd_delivery_settled(delivery);
+    qd_router_delivery_t *delivery = (qd_router_delivery_t *)pn_delivery_get_context(pnd);
+    if (!delivery) return;
+
+    bool           changed = qd_router_delivery_disp_changed(delivery);
+    uint64_t       disp    = qd_router_delivery_disp(delivery);
+    bool           settled = qd_router_delivery_settled(delivery);
 
     sys_mutex_lock(router->lock);
-    qd_delivery_t *peer = qd_delivery_peer(delivery);
+    qd_router_delivery_t *peer = qd_router_delivery_peer(delivery);
     if (peer) {
         //
         // The case where this delivery has a peer.
         //
         if (changed || settled) {
-            qd_link_t         *peer_link = qd_delivery_link(peer);
-            qd_router_link_t  *prl       = (qd_router_link_t*) qd_link_get_context(peer_link);
+            qd_router_link_t  *peer_link = qd_router_delivery_link(peer);
             qd_routed_event_t *re        = new_qd_routed_event_t();
             DEQ_ITEM_INIT(re);
             re->delivery    = peer;
@@ -932,17 +939,17 @@ static void router_disposition_handler(v
             re->settle      = settled;
             re->disposition = changed ? disp : 0;
 
-            qd_delivery_fifo_enter_LH(peer);
-            DEQ_INSERT_TAIL(prl->event_fifo, re);
+            qd_router_delivery_fifo_enter_LH(peer);
+            DEQ_INSERT_TAIL(peer_link->event_fifo, re);
             if (settled) {
-                qd_delivery_unlink_LH(delivery);
-                qd_delivery_free_LH(delivery, 0);
+                qd_router_delivery_unlink_LH(delivery);
+                qd_router_delivery_free_LH(delivery, 0);
             }
 
-            qd_link_activate(peer_link);
+            qd_link_activate(peer_link->link);
         }
     } else if (settled)
-        qd_delivery_free_LH(delivery, 0);
+        qd_router_delivery_free_LH(delivery, 0);
 
     sys_mutex_unlock(router->lock);
 }
@@ -1012,6 +1019,7 @@ static void qd_router_attach_routed_link
         rlink->target         = 0;
         DEQ_INIT(rlink->event_fifo);
         DEQ_INIT(rlink->msg_fifo);
+        DEQ_INIT(rlink->deliveries);
         qd_link_set_context(link, rlink);
 
         sys_mutex_lock(la->router->lock);
@@ -1195,6 +1203,7 @@ static int router_incoming_link_handler(
     rlink->target         = 0;
     DEQ_INIT(rlink->event_fifo);
     DEQ_INIT(rlink->msg_fifo);
+    DEQ_INIT(rlink->deliveries);
 
     if (!is_router && r_tgt) {
         rlink->target = (char*) malloc(strlen(r_tgt) + 1);
@@ -1321,6 +1330,7 @@ static int router_outgoing_link_handler(
     rlink->target         = 0;
     DEQ_INIT(rlink->event_fifo);
     DEQ_INIT(rlink->msg_fifo);
+    DEQ_INIT(rlink->deliveries);
 
     qd_link_set_context(link, rlink);
     pn_terminus_copy(qd_link_source(link), qd_link_remote_source(link));
@@ -1678,6 +1688,7 @@ static void router_outbound_open_handler
     rlink->target         = 0;
     DEQ_INIT(rlink->event_fifo);
     DEQ_INIT(rlink->msg_fifo);
+    DEQ_INIT(rlink->deliveries);
 
     qd_link_set_context(receiver, rlink);
     qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
@@ -1705,6 +1716,7 @@ static void router_outbound_open_handler
     rlink->target         = 0;
     DEQ_INIT(rlink->event_fifo);
     DEQ_INIT(rlink->msg_fifo);
+    DEQ_INIT(rlink->deliveries);
 
     //
     // Add the new outgoing link to the hello_address's list of links.

Modified: qpid/dispatch/trunk/src/router_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_private.h?rev=1673546&r1=1673545&r2=1673546&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_private.h (original)
+++ qpid/dispatch/trunk/src/router_private.h Tue Apr 14 20:03:33 2015
@@ -60,7 +60,7 @@ ENUM_DECLARE(qd_link_type);
 
 typedef struct qd_routed_event_t {
     DEQ_LINKS(struct qd_routed_event_t);
-    qd_delivery_t *delivery;
+    qd_router_delivery_t *delivery;
     qd_message_t  *message;
     bool           settle;
     uint64_t       disposition;
@@ -73,20 +73,22 @@ extern const char *QD_ROUTER_LINK_TYPE;
 ALLOC_DECLARE(qd_routed_event_t);
 DEQ_DECLARE(qd_routed_event_t, qd_routed_event_list_t);
 
+DEQ_DECLARE(qd_router_delivery_t, qd_router_delivery_list_t);
 
 struct qd_router_link_t {
     DEQ_LINKS(qd_router_link_t);
-    int                     mask_bit;        ///< Unique mask bit if this is an inter-router
link
-    qd_link_type_t          link_type;
-    qd_direction_t          link_direction;
-    qd_address_t           *owning_addr;     ///< [ref] Address record that owns this
link
-    qd_waypoint_t          *waypoint;        ///< [ref] Waypoint that owns this link
-    qd_link_t              *link;            ///< [own] Link pointer
-    qd_router_link_t       *connected_link;  ///< [ref] If this is a link-route, reference
the connected link
-    qd_router_link_ref_t   *ref;             ///< Pointer to a containing reference object
-    char                   *target;          ///< Target address for incoming links
-    qd_routed_event_list_t  event_fifo;      ///< FIFO of outgoing delivery/link events
(no messages)
-    qd_routed_event_list_t  msg_fifo;        ///< FIFO of outgoing message deliveries
+    int                       mask_bit;        ///< Unique mask bit if this is an inter-router
link
+    qd_link_type_t            link_type;
+    qd_direction_t            link_direction;
+    qd_address_t             *owning_addr;     ///< [ref] Address record that owns this
link
+    qd_waypoint_t            *waypoint;        ///< [ref] Waypoint that owns this link
+    qd_link_t                *link;            ///< [own] Link pointer
+    qd_router_link_t         *connected_link;  ///< [ref] If this is a link-route, reference
the connected link
+    qd_router_link_ref_t     *ref;             ///< Pointer to a containing reference
object
+    char                     *target;          ///< Target address for incoming links
+    qd_routed_event_list_t    event_fifo;      ///< FIFO of outgoing delivery/link events
(no messages)
+    qd_routed_event_list_t    msg_fifo;        ///< FIFO of outgoing message deliveries
+    qd_router_delivery_list_t deliveries;      ///< [own] outstanding unsettled deliveries
 };
 
 ALLOC_DECLARE(qd_router_link_t);
@@ -290,4 +292,30 @@ qd_address_t *qd_router_address_lookup_L
                                           qd_field_iterator_t *addr_iter,
                                           bool *is_local, bool *is_direct);
 
+/**
+ * Important: qd_router_link_new_delivery must never be called twice in a row
+ *            without an intervening pn_link_advance.  The Disatch architecture
+ *            provides a hook for discovering when an outgoing link is writable
+ *            and has credit.  When a link is writable, a delivery is
+ *            allocated, written, and advanced in one operation.  If a backlog
+ *            of pending deliveries is created, an assertion will be thrown.
+ */
+qd_router_delivery_t *qd_router_link_new_delivery(qd_router_link_t *link, pn_delivery_tag_t
tag);
+qd_router_delivery_t *qd_router_delivery(qd_router_link_t *link, pn_delivery_t *pnd);
+void qd_router_delivery_set_undeliverable_LH(qd_router_delivery_t *delivery);
+void qd_router_delivery_free_LH(qd_router_delivery_t *delivery, uint64_t final_disposition);
+void qd_router_delivery_link_peers_LH(qd_router_delivery_t *left, qd_router_delivery_t *right);
+void qd_router_delivery_unlink_LH(qd_router_delivery_t *delivery);
+void qd_router_delivery_fifo_enter_LH(qd_router_delivery_t *delivery);
+bool qd_router_delivery_fifo_exit_LH(qd_router_delivery_t *delivery);
+qd_router_delivery_t *qd_router_delivery_peer(qd_router_delivery_t *delivery);
+void qd_router_delivery_set_context(qd_router_delivery_t *delivery, void *context);
+void *qd_router_delivery_context(qd_router_delivery_t *delivery);
+pn_delivery_t *qd_router_delivery_pn(qd_router_delivery_t *delivery);
+void qd_router_delivery_settle(qd_router_delivery_t *delivery);
+bool qd_router_delivery_settled(qd_router_delivery_t *delivery);
+bool qd_router_delivery_disp_changed(qd_router_delivery_t *delivery);
+uint64_t qd_router_delivery_disp(qd_router_delivery_t *delivery);
+qd_router_link_t *qd_router_delivery_link(qd_router_delivery_t *delivery);
+
 #endif

Modified: qpid/dispatch/trunk/src/waypoint.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/waypoint.c?rev=1673546&r1=1673545&r2=1673546&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/waypoint.c (original)
+++ qpid/dispatch/trunk/src/waypoint.c Tue Apr 14 20:03:33 2015
@@ -97,6 +97,8 @@ static void qd_waypoint_visit_sink_LH(qd
         rlink->target         = 0;
         DEQ_INIT(rlink->event_fifo);
         DEQ_INIT(rlink->msg_fifo);
+        DEQ_INIT(rlink->deliveries);
+
         qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
         DEQ_INSERT_TAIL(router->links, rlink);
         qd_link_set_context(wp->out_link, rlink);
@@ -170,6 +172,8 @@ static void qd_waypoint_visit_source_LH(
         rlink->target         = 0;
         DEQ_INIT(rlink->event_fifo);
         DEQ_INIT(rlink->msg_fifo);
+        DEQ_INIT(rlink->deliveries);
+
         qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
         DEQ_INSERT_TAIL(router->links, rlink);
         qd_link_set_context(wp->in_link, rlink);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message