qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tr...@apache.org
Subject qpid-dispatch git commit: DISPATCH_179 - Link-routing implemented for attach, flow, transfer.
Date Mon, 07 Mar 2016 22:22:21 GMT
Repository: qpid-dispatch
Updated Branches:
  refs/heads/tross-DISPATCH-179-1 3c84c0be4 -> bb14ecfb6


DISPATCH_179 - Link-routing implemented for attach, flow, transfer.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/bb14ecfb
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/bb14ecfb
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/bb14ecfb

Branch: refs/heads/tross-DISPATCH-179-1
Commit: bb14ecfb6d070e41ac4d4dfe0eeb779f0b38117d
Parents: 3c84c0b
Author: Ted Ross <tross@redhat.com>
Authored: Mon Mar 7 17:21:15 2016 -0500
Committer: Ted Ross <tross@redhat.com>
Committed: Mon Mar 7 17:21:15 2016 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h   |  2 +-
 src/router_core/connections.c         |  8 ++++
 src/router_core/forwarder.c           |  2 +-
 src/router_core/router_core_private.h |  4 ++
 src/router_core/transfer.c            | 71 +++++++++++++++++++-----------
 src/router_node.c                     |  4 +-
 6 files changed, 61 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb14ecfb/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 06669f5..4dcc167 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -494,7 +494,7 @@ qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg,
qd_field_i
 qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
                                     qd_field_iterator_t *ingress, qd_field_iterator_t *addr,
                                     bool settled, qd_bitmask_t *link_exclusion);
-qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg);
+qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg, bool
settled);
 
 void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb14ecfb/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 1aca79f..ab8f6e1 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -957,6 +957,14 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t
*ac
     qdr_terminus_t   *source = action->args.connection.source;
     qdr_terminus_t   *target = action->args.connection.target;
 
+    //
+    // Handle attach-routed links
+    //
+    if (link->connected_link) {
+        qdr_link_outbound_second_attach_CT(core, link->connected_link, source, target);
+        return;
+    }
+
     if (link->link_direction == QD_INCOMING) {
         //
         // Handle incoming link cases

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb14ecfb/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 1c0f7ec..0b4562b 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -351,7 +351,7 @@ bool qdr_forward_link_balanced_CT(qdr_core_t     *core,
                                   qdr_terminus_t *target)
 {
     qdr_connection_ref_t *conn_ref = DEQ_HEAD(addr->conns);
-    qdr_connection_t     *conn;
+    qdr_connection_t     *conn     = 0;
 
     //
     // Check for locally connected containers that can handle this link attach.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb14ecfb/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 2877677..f44b1c5 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -236,6 +236,8 @@ struct qdr_link_t {
     int                      capacity;
     int                      incremental_credit_CT;
     int                      incremental_credit;
+    bool                     drain_mode;
+    int                      credit_to_core; ///< Number of the available credits incrementally
given to the core
     uint64_t                 total_deliveries;
 };
 
@@ -552,6 +554,8 @@ void qdr_post_link_lost_CT(qdr_core_t *core, int link_maskbit);
 void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work);
 void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local);
 
+qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *peer, qdr_link_t
*link, qd_message_t *msg);
+void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv);
 void qdr_connection_activate_CT(qdr_core_t *core, qdr_connection_t *conn);
 
 void qdr_connection_enqueue_work_CT(qdr_core_t            *core,

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb14ecfb/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index fcc33be..9484489 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -77,31 +77,19 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg,
 }
 
 
-qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg)
+qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg, bool
settled)
 {
-    // TODO - Implement this.  Bypass the CT?
+    qdr_action_t   *action = qdr_action(qdr_link_deliver_CT, "link_deliver");
+    qdr_delivery_t *dlv    = new_qdr_delivery_t();
 
-    //
-    // We might wish to run link-routed transfers and updates through the core in order to
-    // track the number of outstanding deliveries and to have the ability to intervene in
-    // flow control.
-    //
-    // Use case: Quiescing a broker.  To do this, all inbound links to the broker shall be
-    // idled by preventing the propagation of flow credit out of the broker.  This will dry
-    // the transfer of inbound deliveries, allow all existing deliveries to be settled, and
-    // allow the router to know when it is safe to detach the inbound links.  Outbound links
-    // can also be detached after all deliveries are settled and "drained" indications are
-    // received.
-    //
-    // Waypoint disconnect procedure:
-    //   1) Block flow-credit propagation for link outbound to waypoint.
-    //   2) Wait for the number of unsettled outbound deliveries to go to zero.
-    //   3) Detach the outbound link.
-    //   4) Wait for inbound link to be drained with zero unsettled deliveries.
-    //   5) Detach inbound link.
-    //
+    ZERO(dlv);
+    dlv->link    = link;
+    dlv->msg     = msg;
+    dlv->settled = settled;
 
-    return 0;
+    action->args.connection.delivery = dlv;
+    qdr_action_enqueue(link->core, action);
+    return dlv;
 }
 
 
@@ -127,6 +115,7 @@ void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int
credit)
         sys_mutex_unlock(conn->work_lock);
 
         if (dlv) {
+            link->credit_to_core--;
             core->deliver_handler(core->user_context, link, dlv, dlv->settled);
             if (dlv->settled)
                 qdr_delivery_free(dlv);
@@ -160,6 +149,17 @@ void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link,
int credit)
 void qdr_link_flow(qdr_core_t *core, qdr_link_t *link, int credit, bool drain_mode)
 {
     qdr_action_t *action = qdr_action(qdr_link_flow_CT, "link_flow");
+
+    //
+    // Compute the number of credits now available that we haven't yet given
+    // incrementally to the router core.  i.e. convert absolute credit to
+    // incremental credit.
+    //
+    credit -= link->credit_to_core;
+    if (credit < 0)
+        credit = 0;
+    link->credit_to_core += credit;
+
     action->args.connection.link   = link;
     action->args.connection.credit = credit;
     action->args.connection.drain  = drain_mode;
@@ -278,13 +278,16 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action,
bool discar
         return;
 
     qdr_link_t *link = action->args.connection.link;
-    int  credit = action->args.connection.credit;
-    //bool drain  = action->args.connection.drain;
-    bool activate = false;
+    int  credit      = action->args.connection.credit;
+    bool drain       = action->args.connection.drain;
+    bool activate    = false;
 
     //
-    // TODO - If this is a link-routed link, propagate the flow data downrange.
+    // If this is an attach-routed link, propagate the flow data downrange.
+    // Note that the credit value is incremental.
     //
+    if (link->connected_link)
+        qdr_link_issue_credit_CT(core, link->connected_link, credit);
 
     //
     // Handle the replenishing of credit outbound
@@ -298,6 +301,11 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action,
bool discar
         sys_mutex_unlock(link->conn->work_lock);
     }
 
+    //
+    // Record the drain mode for the link
+    //
+    link->drain_mode = drain;
+
     if (activate)
         qdr_connection_activate_CT(core, link->conn);
 }
@@ -315,6 +323,17 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action,
bool dis
     bool            presettled   = dlv->settled;
 
     //
+    // If this is an attach-routed link, put the delivery directly onto the peer link
+    //
+    if (link->connected_link) {
+        qdr_delivery_t *peer = qdr_forward_new_delivery_CT(core, dlv, link->connected_link,
dlv->msg);
+        qdr_forward_deliver_CT(core, link->connected_link, peer);
+        qd_message_free(dlv->msg);
+        dlv->msg = 0;
+        return;
+    }
+
+    //
     // NOTE: The link->undelivered list does not need to be protected by the
     //       connection's work lock for incoming links.  This protection is only
     //       needed for outgoing links.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb14ecfb/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index a2b6e2d..27b54c0 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -222,7 +222,7 @@ static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t
*pnd
     // Handle the link-routed case
     //
     if (qdr_link_is_routed(rlink)) {
-        // TODO - Add Link-Route Forwarding here
+        qdr_link_deliver_to_routed_link(rlink, msg, pn_delivery_settled(pnd));
         return;
     }
 
@@ -417,7 +417,7 @@ static int router_link_flow_handler(void* context, qd_link_t *link)
     if (!rlink)
         return 0;
 
-    qdr_link_flow(router->router_core, rlink, pn_link_credit(pnlink), pn_link_get_drain(pnlink));
+    qdr_link_flow(router->router_core, rlink, pn_link_remote_credit(pnlink), pn_link_get_drain(pnlink));
 
     return 0;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message