qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tr...@apache.org
Subject [14/50] [abbrv] qpid-dispatch git commit: DISPATCH-179 - WIP for routed attaches.
Date Fri, 18 Mar 2016 23:06:32 GMT
DISPATCH-179 - WIP for routed attaches.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/3c84c0be
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/3c84c0be
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/3c84c0be

Branch: refs/heads/master
Commit: 3c84c0be46cd3dda34b1aa904c4149afdcd06ca9
Parents: f970827
Author: Ted Ross <tross@redhat.com>
Authored: Fri Mar 4 17:19:19 2016 -0500
Committer: Ted Ross <tross@redhat.com>
Committed: Fri Mar 4 17:19:19 2016 -0500

----------------------------------------------------------------------
 src/router_core/connections.c         | 16 ++-----
 src/router_core/forwarder.c           | 74 +++++++++++++++++++++++++++---
 src/router_core/router_core_private.h |  7 ++-
 3 files changed, 78 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3c84c0be/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 923114f..1aca79f 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -352,9 +352,9 @@ void qdr_connection_activate_CT(qdr_core_t *core, qdr_connection_t *conn)
 }
 
 
-static void qdr_connection_enqueue_work_CT(qdr_core_t            *core,
-                                           qdr_connection_t      *conn,
-                                           qdr_connection_work_t *work)
+void qdr_connection_enqueue_work_CT(qdr_core_t            *core,
+                                    qdr_connection_t      *conn,
+                                    qdr_connection_work_t *work)
 {
     sys_mutex_lock(conn->work_lock);
     DEQ_INSERT_TAIL(conn->work_list, work);
@@ -516,12 +516,6 @@ static void qdr_link_outbound_second_attach_CT(qdr_core_t *core, qdr_link_t
*lin
 }
 
 
-static void qdr_forward_first_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_address_t
*addr,
-                                        qdr_terminus_t *source, qdr_terminus_t *target)
-{
-}
-
-
 static char qdr_prefix_for_dir(qd_direction_t dir)
 {
     return (dir == QD_INCOMING) ? 'C' : 'D';
@@ -847,7 +841,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t
*act
                     //
                     // This is a link-routed destination, forward the attach to the next
hop
                     //
-                    qdr_forward_first_attach_CT(core, link, addr, source, target);
+                    qdr_forward_attach_CT(core, addr, link, source, target);
 
                 else {
                     //
@@ -898,7 +892,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t
*act
                 //
                 // This is a link-routed destination, forward the attach to the next hop
                 //
-                qdr_forward_first_attach_CT(core, link, addr, source, target);
+                qdr_forward_attach_CT(core, addr, link, source, target);
 
             else {
                 //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3c84c0be/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 4628e02..1c0f7ec 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -33,9 +33,11 @@ typedef int (*qdr_forward_message_t) (qdr_core_t      *core,
                                       bool             control,
                                       qd_bitmask_t    *link_exclusion);
 
-typedef void (*qdr_forward_attach_t) (qdr_core_t      *core,
-                                      qdr_forwarder_t *forw,
-                                      qdr_link_t      *link);
+typedef bool (*qdr_forward_attach_t) (qdr_core_t     *core,
+                                      qdr_address_t  *addr,
+                                      qdr_link_t     *link,
+                                      qdr_terminus_t *source,
+                                      qdr_terminus_t *target);
 
 struct qdr_forwarder_t {
     qdr_forward_message_t forward_message;
@@ -342,10 +344,64 @@ int qdr_forward_balanced_CT(qdr_core_t      *core,
 }
 
 
-void qdr_forward_link_balanced_CT(qdr_core_t      *core,
-                                  qdr_forwarder_t *forw,
-                                  qdr_link_t      *link)
+bool qdr_forward_link_balanced_CT(qdr_core_t     *core,
+                                  qdr_address_t  *addr,
+                                  qdr_link_t     *in_link,
+                                  qdr_terminus_t *source,
+                                  qdr_terminus_t *target)
 {
+    qdr_connection_ref_t *conn_ref = DEQ_HEAD(addr->conns);
+    qdr_connection_t     *conn;
+
+    //
+    // Check for locally connected containers that can handle this link attach.
+    //
+    if (conn_ref) {
+        conn = conn_ref->conn;
+
+        //
+        // If there are more than one local connections available for handling this link,
+        // rotate the list so the attaches are balanced across the containers.
+        //
+        if (DEQ_SIZE(addr->conns) > 1) {
+            DEQ_REMOVE_HEAD(addr->conns);
+            DEQ_INSERT_TAIL(addr->conns, conn_ref);
+        }
+    }
+
+    //
+    // TODO - Use the next-hop connection if there are no local containers.
+    //
+
+    if (conn) {
+        qdr_link_t *out_link = new_qdr_link_t();
+        ZERO(out_link);
+        out_link->core           = core;
+        out_link->identifier     = qdr_identifier(core);
+        out_link->conn           = conn;
+        out_link->link_type      = QD_LINK_ENDPOINT;
+        out_link->link_direction = qdr_link_direction(in_link) == QD_OUTGOING ? QD_INCOMING
: QD_OUTGOING;
+        out_link->name           = in_link->name;
+
+        out_link->connected_link = in_link;
+        in_link->connected_link  = out_link;
+
+        DEQ_INSERT_TAIL(core->open_links, out_link);
+        qdr_add_link_ref(&conn->links, out_link, QDR_LINK_LIST_CLASS_CONNECTION);
+
+        qdr_connection_work_t *work = new_qdr_connection_work_t();
+        ZERO(work);
+        work->work_type = QDR_CONNECTION_WORK_FIRST_ATTACH;
+        work->link      = out_link;
+        work->source    = source;
+        work->target    = target;
+
+        qdr_connection_enqueue_work_CT(core, conn, work);
+
+        return true;
+    }
+
+    return false;
 }
 
 
@@ -401,7 +457,11 @@ int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t
*
 }
 
 
-void qdr_forward_attach_CT(qdr_core_t *core, qdr_forwarder_t *forwarder, qdr_link_t *in_link)
+bool qdr_forward_attach_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *in_link,
+                           qdr_terminus_t *source, qdr_terminus_t *target)
 {
+    if (addr->forwarder)
+        return addr->forwarder->forward_attach(core, addr, in_link, source, target);
+    return false;
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3c84c0be/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 0988ffa..2877677 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -39,7 +39,8 @@ typedef struct qdr_connection_ref_t  qdr_connection_ref_t;
 qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_treatment_t treatment);
 int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t
*in_delivery,
                            bool exclude_inprocess, bool control, qd_bitmask_t *link_exclusion);
-void qdr_forward_attach_CT(qdr_core_t *core, qdr_forwarder_t *forwarder, qdr_link_t *in_link);
+bool qdr_forward_attach_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *in_link, qdr_terminus_t
*source,
+                           qdr_terminus_t *target);
 
 /**
  * qdr_field_t - This type is used to pass variable-length fields (strings, etc.) into
@@ -553,6 +554,10 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local);
 
 void qdr_connection_activate_CT(qdr_core_t *core, qdr_connection_t *conn);
 
+void qdr_connection_enqueue_work_CT(qdr_core_t            *core,
+                                    qdr_connection_t      *conn,
+                                    qdr_connection_work_t *work);
+
 qdr_query_t *qdr_query(qdr_core_t              *core,
                        void                    *context,
                        qd_router_entity_type_t  type,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message