qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tr...@apache.org
Subject [1/3] qpid-dispatch git commit: DISPATCH-179 - Added outgoing delivery linkage
Date Fri, 15 Jan 2016 18:54:52 GMT
Repository: qpid-dispatch
Updated Branches:
  refs/heads/tross-DISPATCH-179-1 59cc7b53d -> 6d9dc9e9d


DISPATCH-179 - Added outgoing delivery linkage


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/f0bfea6f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/f0bfea6f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/f0bfea6f

Branch: refs/heads/tross-DISPATCH-179-1
Commit: f0bfea6f30f7f9f9b3c104c74e1324d89f698fd7
Parents: 59cc7b5
Author: Ted Ross <tross@redhat.com>
Authored: Wed Jan 13 13:50:27 2016 -0500
Committer: Ted Ross <tross@redhat.com>
Committed: Wed Jan 13 13:50:27 2016 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h   | 15 +++++-
 src/router_core/connections.c         | 27 +++++++----
 src/router_core/forwarder.c           | 30 +++++++++---
 src/router_core/router_core.c         | 17 ++++---
 src/router_core/router_core_private.h | 17 +++++--
 src/router_core/transfer.c            | 74 ++++++++++++++++++++++++++++++
 src/router_node.c                     | 47 ++++++++++++++++++-
 7 files changed, 201 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f0bfea6f/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 20840cd..b0119eb 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -482,12 +482,18 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
                                     qd_field_iterator_t *ingress, qd_field_iterator_t *addr,
bool settled);
 qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg);
 
+void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit);
+
 typedef void (*qdr_link_first_attach_t)  (void *context, qdr_connection_t *conn, qdr_link_t
*link, 
                                           qdr_terminus_t *source, qdr_terminus_t *target);
 typedef void (*qdr_link_second_attach_t) (void *context, qdr_link_t *link,
                                           qdr_terminus_t *source, qdr_terminus_t *target);
 typedef void (*qdr_link_detach_t)        (void *context, qdr_link_t *link, qdr_error_t *error);
 typedef void (*qdr_link_flow_t)          (void *context, qdr_link_t *link);
+typedef void (*qdr_link_offer_t)         (void *context, qdr_link_t *link, int delivery_count);
+typedef void (*qdr_link_drained_t)       (void *context, qdr_link_t *link);
+typedef void (*qdr_link_push_t)          (void *context, qdr_link_t *link);
+typedef void (*qdr_link_deliver_t)       (void *context, qdr_link_t *link, qdr_delivery_t
*delivery);
 
 void qdr_connection_handlers(qdr_core_t                *core,
                              void                      *context,
@@ -495,7 +501,11 @@ void qdr_connection_handlers(qdr_core_t                *core,
                              qdr_link_first_attach_t    first_attach,
                              qdr_link_second_attach_t   second_attach,
                              qdr_link_detach_t          detach,
-                             qdr_link_flow_t            flow);
+                             qdr_link_flow_t            flow,
+                             qdr_link_offer_t           offer,
+                             qdr_link_drained_t         drained,
+                             qdr_link_push_t            push,
+                             qdr_link_deliver_t         deliver);
 
 /**
  ******************************************************************************
@@ -506,11 +516,14 @@ void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context);
 void *qdr_delivery_get_context(qdr_delivery_t *delivery);
 uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery);
 bool qdr_delivery_is_settled(const qdr_delivery_t *delivery);
+void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int *length);
+qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery);
 
 void qdr_delivery_update_disposition(qdr_delivery_t *delivery);
 void qdr_delivery_update_flow(qdr_delivery_t *delivery);
 void qdr_delivery_process(qdr_delivery_t *delivery);
 
+
 /**
  ******************************************************************************
  * Management functions

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f0bfea6f/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 697b6ce..6adaa7e 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -118,6 +118,7 @@ int qdr_connection_process(qdr_connection_t *conn)
 
     sys_mutex_lock(conn->work_lock);
     DEQ_MOVE(conn->work_list, work_list);
+    // TODO - Grab the list of links with deliveries
     sys_mutex_unlock(conn->work_lock);
 
     int event_count = DEQ_SIZE(work_list);
@@ -146,6 +147,8 @@ int qdr_connection_process(qdr_connection_t *conn)
         work = DEQ_HEAD(work_list);
     }
 
+    // TODO - Invoke the push handler for each link with deliveries
+
     return event_count;
 }
 
@@ -268,7 +271,11 @@ void qdr_connection_handlers(qdr_core_t                *core,
                              qdr_link_first_attach_t    first_attach,
                              qdr_link_second_attach_t   second_attach,
                              qdr_link_detach_t          detach,
-                             qdr_link_flow_t            flow)
+                             qdr_link_flow_t            flow,
+                             qdr_link_offer_t           offer,
+                             qdr_link_drained_t         drained,
+                             qdr_link_push_t            push,
+                             qdr_link_deliver_t         deliver)
 {
     core->user_context          = context;
     core->activate_handler      = activate;
@@ -276,6 +283,10 @@ void qdr_connection_handlers(qdr_core_t                *core,
     core->second_attach_handler = second_attach;
     core->detach_handler        = detach;
     core->flow_handler          = flow;
+    core->offer_handler         = offer;
+    core->drained_handler       = drained;
+    core->push_handler          = push;
+    core->deliver_handler       = deliver;
 }
 
 
@@ -699,7 +710,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t
*act
                     // to do an address lookup for deliveries that arrive on this link.
                     //
                     link->owning_addr = addr;
-                    qdr_add_link_ref(&addr->inlinks, link);
+                    qdr_add_link_ref(&addr->inlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
                     qdr_link_outbound_second_attach_CT(core, link, source, target);
                 }
             }
@@ -747,7 +758,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t
*act
                 // to do an address lookup for deliveries that arrive on this link.
                 //
                 link->owning_addr = addr;
-                qdr_add_link_ref(&addr->rlinks, link);
+                qdr_add_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
                 if (DEQ_SIZE(addr->rlinks) == 1) {
                     const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
                     if (key && *key == 'M')
@@ -764,7 +775,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t
*act
 
         case QD_LINK_CONTROL:
             link->owning_addr = core->hello_addr;
-            qdr_add_link_ref(&core->hello_addr->rlinks, link);
+            qdr_add_link_ref(&core->hello_addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
             core->control_links_by_mask_bit[conn->mask_bit] = link;
             qdr_link_outbound_second_attach_CT(core, link, source, target);
             break;
@@ -836,7 +847,7 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t
*ac
 
         case QD_LINK_CONTROL:
             link->owning_addr = core->hello_addr;
-            qdr_add_link_ref(&core->hello_addr->rlinks, link);
+            qdr_add_link_ref(&core->hello_addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
             core->control_links_by_mask_bit[conn->mask_bit] = link;
             break;
 
@@ -885,7 +896,7 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t
*action, b
         switch (link->link_type) {
         case QD_LINK_ENDPOINT:
             if (addr)
-                qdr_del_link_ref(&addr->inlinks, link);
+                qdr_del_link_ref(&addr->inlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
             break;
 
         case QD_LINK_WAYPOINT:
@@ -904,7 +915,7 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t
*action, b
         switch (link->link_type) {
         case QD_LINK_ENDPOINT:
             if (addr) {
-                qdr_del_link_ref(&addr->rlinks, link);
+                qdr_del_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
                 was_local = true;
             }
             break;
@@ -913,7 +924,7 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t
*action, b
             break;
 
         case QD_LINK_CONTROL:
-            qdr_del_link_ref(&core->hello_addr->rlinks, link);
+            qdr_del_link_ref(&core->hello_addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
             core->control_links_by_mask_bit[conn->mask_bit] = 0;
             qdr_post_link_lost_CT(core, conn->mask_bit);
             break;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f0bfea6f/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 85759b5..4fedce8 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -47,7 +47,7 @@ struct qdr_forwarder_t {
 //==================================================================================
 
 
-qdr_delivery_t *qdr_forward_new_delivery(qdr_delivery_t *peer, qdr_link_t *link, qd_message_t
*msg)
+qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *peer, qdr_link_t
*link, qd_message_t *msg)
 {
     qdr_delivery_t *dlv = new_qdr_delivery_t();
 
@@ -56,6 +56,7 @@ qdr_delivery_t *qdr_forward_new_delivery(qdr_delivery_t *peer, qdr_link_t
*link,
     dlv->peer    = peer;
     dlv->msg     = qd_message_copy(msg);
     dlv->settled = !peer || peer->settled;
+    dlv->tag     = core->next_tag++;
 
     if (peer->peer == 0)
         peer->peer = dlv;  // TODO - make this a back-list for multicast tracking
@@ -64,6 +65,24 @@ qdr_delivery_t *qdr_forward_new_delivery(qdr_delivery_t *peer, qdr_link_t
*link,
 }
 
 
+void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv)
+{
+    sys_mutex_lock(link->conn->work_lock);
+    DEQ_INSERT_TAIL(link->undelivered, dlv);
+    sys_mutex_unlock(link->conn->work_lock);
+
+    //
+    // If the link isn't already on the links_with_deliveries list, put it there.
+    //
+    qdr_add_link_ref(&link->conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY);
+
+    //
+    // Activate the outgoing connection for later processing.
+    //
+    qdr_connection_activate_CT(core, link->conn);
+}
+
+
 int qdr_forward_multicast_CT(qdr_core_t      *core,
                              qdr_address_t   *addr,
                              qd_message_t    *msg,
@@ -80,9 +99,8 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
     qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
     while (link_ref) {
         qdr_link_t     *out_link     = link_ref->link;
-        qdr_delivery_t *out_delivery = qdr_forward_new_delivery(in_delivery, out_link, msg);
-        DEQ_INSERT_TAIL(out_link->undelivered, out_delivery); // TODO - check locking
on this list
-        qdr_connection_activate_CT(core, out_link->conn);
+        qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link,
msg);
+        qdr_forward_deliver_CT(core, out_link, out_delivery);
         fanout++;
         link_ref = DEQ_NEXT(link_ref);
     }
@@ -148,8 +166,8 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
                 core->control_links_by_mask_bit[link_bit] :
                 core->data_links_by_mask_bit[link_bit];
             if (dest_link) {
-                qdr_delivery_t *out_delivery = qdr_forward_new_delivery(in_delivery, dest_link,
msg);
-                DEQ_INSERT_TAIL(dest_link->undelivered, out_delivery); // TODO - check
locking on this list
+                qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery,
dest_link, msg);
+                qdr_forward_deliver_CT(core, dest_link, out_delivery);
                 fanout++;
                 addr->deliveries_transit++;
                 qdr_connection_activate_CT(core, dest_link->conn);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f0bfea6f/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 359a192..479ba01 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -188,22 +188,25 @@ qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, char aclass,
const cha
 }
 
 
-void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link)
+void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls)
 {
+    if (link->ref[cls] != 0)
+        return;
+
     qdr_link_ref_t *ref = new_qdr_link_ref_t();
     DEQ_ITEM_INIT(ref);
     ref->link = link;
-    link->ref = ref;
+    link->ref[cls] = ref;
     DEQ_INSERT_TAIL(*ref_list, ref);
 }
 
 
-void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link)
+void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls)
 {
-    if (link->ref) {
-        DEQ_REMOVE(*ref_list, link->ref);
-        free_qdr_link_ref_t(link->ref);
-        link->ref = 0;
+    if (link->ref[cls]) {
+        DEQ_REMOVE(*ref_list, link->ref[cls]);
+        free_qdr_link_ref_t(link->ref[cls]);
+        link->ref[cls] = 0;
     }
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f0bfea6f/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 8d80bf5..0ef3db4 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -173,11 +173,15 @@ struct qdr_delivery_t {
     qd_field_iterator_t *origin;
     uint64_t             disposition;
     bool                 settled;
+    uint64_t             tag;
 };
 
 ALLOC_DECLARE(qdr_delivery_t);
 DEQ_DECLARE(qdr_delivery_t, qdr_delivery_list_t);
 
+#define QDR_LINK_LIST_CLASS_ADDRESS  0
+#define QDR_LINK_LIST_CLASS_DELIVERY 1
+#define QDR_LINK_LIST_CLASSES        2
 
 struct qdr_link_t {
     DEQ_LINKS(qdr_link_t);
@@ -189,7 +193,7 @@ struct qdr_link_t {
     char                *name;
     qdr_address_t       *owning_addr;     ///< [ref] Address record that owns this link
     qdr_link_t          *connected_link;  ///< [ref] If this is a link-route, reference
the connected link
-    qdr_link_ref_t      *ref;             ///< Pointer to a containing reference object
(TODO - check this!)
+    qdr_link_ref_t      *ref[QDR_LINK_LIST_CLASSES];  ///< Pointers to containing reference
objects
     qdr_delivery_list_t  undelivered;     ///< Deliveries to be forwarded or sent
     qdr_delivery_list_t  unsettled;       ///< Unsettled deliveries
     bool                 strip_annotations_in;
@@ -266,8 +270,8 @@ DEQ_DECLARE(qdr_address_t, qdr_address_list_t);
 qdr_address_t *qdr_address_CT(qdr_core_t *core, qd_address_semantics_t semantics);
 qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, char aclass, const char *addr,
qd_address_semantics_t semantics);
 
-void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link);
-void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link);
+void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls);
+void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls);
 
 void qdr_add_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode);
 void qdr_del_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode);
@@ -336,6 +340,7 @@ struct qdr_connection_t {
     qdr_link_list_t             links;
     qdr_connection_work_list_t  work_list;
     sys_mutex_t                *work_lock;
+    qdr_link_ref_list_t         links_with_deliveries;
 };
 
 ALLOC_DECLARE(qdr_connection_t);
@@ -382,6 +387,10 @@ struct qdr_core_t {
     qdr_link_second_attach_t   second_attach_handler;
     qdr_link_detach_t          detach_handler;
     qdr_link_flow_t            flow_handler;
+    qdr_link_offer_t           offer_handler;
+    qdr_link_drained_t         drained_handler;
+    qdr_link_push_t            push_handler;
+    qdr_link_deliver_t         deliver_handler;
 
     const char *router_area;
     const char *router_id;
@@ -400,6 +409,8 @@ struct qdr_core_t {
     qdr_link_t          **control_links_by_mask_bit;
     qdr_link_t          **data_links_by_mask_bit;
 
+    uint64_t              next_tag;
+
     qdr_forwarder_t      *forwarders[QD_SEMANTICS_LINK_BALANCED + 1];
 };
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f0bfea6f/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 1c56f13..c459628 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -75,10 +75,65 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
 qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg)
 {
     // TODO - Implement this.  Bypass the CT?
+
+    //
+    // We might wish to run link-routed transfers and updates through the core in order to
+    // track the number of outstanding deliveries and to have the ability to intervene in
+    // flow control.
+    //
+    // Use case: Quiescing a broker.  To do this, all inbound links to the broker shall be
+    // idled by preventing the propagation of flow credit out of the broker.  This will dry
+    // the transfer of inbound deliveries, allow all existing deliveries to be settled, and
+    // allow the router to know when it is safe to detach the inbound links.  Outbound links
+    // can also be detached after all deliveries are settled and "drained" indications are
+    // received.
+    //
+    // Waypoint disconnect procedure:
+    //   1) Block flow-credit propagation for link outbound to waypoint.
+    //   2) Wait for the number of unsettled outbound deliveries to go to zero.
+    //   3) Detach the outbound link.
+    //   4) Wait for inbound link to be drained with zero unsettled deliveries.
+    //   5) Detach inbound link.
+    //
+
     return 0;
 }
 
 
+void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
+{
+    qdr_connection_t *conn = link->conn;
+    qdr_delivery_t   *dlv;
+    bool              drained = false;
+    int               offer;
+
+    while (credit > 0 && !drained) {
+        sys_mutex_lock(conn->work_lock);
+        dlv = DEQ_HEAD(link->undelivered);
+        if (dlv) {
+            DEQ_REMOVE_HEAD(link->undelivered);
+            DEQ_INSERT_TAIL(link->unsettled, dlv);
+            credit--;
+            offer = DEQ_SIZE(link->undelivered);
+        } else
+            drained = true;
+        sys_mutex_unlock(conn->work_lock);
+
+        if (dlv)
+            core->deliver_handler(core->user_context, link, dlv);
+    }
+
+    if (drained)
+        core->drained_handler(core->user_context, link);
+    else
+        core->offer_handler(core->user_context, link, offer);
+
+    //
+    // TODO - handle disposition/settlement updates
+    //
+}
+
+
 void qdr_send_to1(qdr_core_t *core, qd_message_t *msg, qd_field_iterator_t *addr, bool exclude_inprocess,
bool control)
 {
     qdr_action_t *action = qdr_action(qdr_send_to_CT, "send_to");
@@ -127,6 +182,19 @@ bool qdr_delivery_is_settled(const qdr_delivery_t *delivery)
 }
 
 
+void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int *length)
+{
+    *tag    = (const char*) &delivery->tag;
+    *length = sizeof(uint64_t);
+}
+
+
+qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery)
+{
+    return delivery->msg;
+}
+
+
 //==================================================================================
 // In-Thread Functions
 //==================================================================================
@@ -140,6 +208,12 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action,
bool dis
     qdr_link_t     *link  = dlv->link;
     int             count = 0;
 
+    //
+    // NOTE: The link->undelivered list does not need to be protected by the
+    //       connection's work lock for incoming links.  This protection is only
+    //       needed for outgoing links.
+    //
+
     if (DEQ_IS_EMPTY(link->undelivered)) {
         qdr_address_t *addr = link->owning_addr;
         if (!addr && dlv->to_addr) {

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f0bfea6f/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 1c7b0dd..c6c3403 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -622,6 +622,47 @@ static void qd_router_link_flow(void *context, qdr_link_t *link)
 }
 
 
+static void qd_router_link_offer(void *context, qdr_link_t *link, int delivery_count)
+{
+}
+
+
+static void qd_router_link_drained(void *context, qdr_link_t *link)
+{
+}
+
+
+static void qd_router_link_push(void *context, qdr_link_t *link)
+{
+    qd_router_t *router      = (qd_router_t*) context;
+    qd_link_t   *qlink       = (qd_link_t*) qdr_link_get_context(link);
+    pn_link_t   *plink       = qd_link_pn(qlink);
+    int          link_credit = pn_link_credit(plink);
+
+    qdr_link_process_deliveries(router->router_core, link, link_credit);
+}
+
+
+static void qd_router_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *dlv)
+{
+    qd_link_t  *qlink = (qd_link_t*) qdr_link_get_context(link);
+    pn_link_t  *plink = qd_link_pn(qlink);
+    const char *tag;
+    int         tag_length;
+
+    qdr_delivery_tag(dlv, &tag, &tag_length);
+
+    pn_delivery(plink, pn_dtag(tag, tag_length));
+    pn_delivery_t *pdlv = pn_link_current(plink);
+
+    pn_delivery_set_context(pdlv, dlv);
+    qdr_delivery_set_context(dlv, pdlv);
+
+    qd_message_send(qdr_delivery_message(dlv), qlink, qdr_link_strip_annotations_out(link));
+    pn_link_advance(plink);
+}
+
+
 void qd_router_setup_late(qd_dispatch_t *qd)
 {
     qd->router->router_core = qdr_core(qd, qd->router->router_area, qd->router->router_id);
@@ -631,7 +672,11 @@ void qd_router_setup_late(qd_dispatch_t *qd)
                             qd_router_link_first_attach,
                             qd_router_link_second_attach,
                             qd_router_link_detach,
-                            qd_router_link_flow);
+                            qd_router_link_flow,
+                            qd_router_link_offer,
+                            qd_router_link_drained,
+                            qd_router_link_push,
+                            qd_router_link_deliver);
 
     qd_router_python_setup(qd->router);
     qd_timer_schedule(qd->router->timer, 1000);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message