qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tr...@apache.org
Subject qpid-dispatch git commit: DISPATCH-179 - WIP, added central address lookup and semantics lookup
Date Wed, 09 Dec 2015 21:38:32 GMT
Repository: qpid-dispatch
Updated Branches:
  refs/heads/tross-DISPATCH-179-1 af4fef5af -> 72678b250


DISPATCH-179 - WIP, added central address lookup and semantics lookup


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/72678b25
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/72678b25
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/72678b25

Branch: refs/heads/tross-DISPATCH-179-1
Commit: 72678b25053aa1fb03a6f8b816a83d09669658a5
Parents: af4fef5
Author: Ted Ross <tross@redhat.com>
Authored: Wed Dec 9 16:37:38 2015 -0500
Committer: Ted Ross <tross@redhat.com>
Committed: Wed Dec 9 16:37:38 2015 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h   |  45 ++++-
 src/router_core/connections.c         | 259 ++++++++++++++++++++++++++++-
 src/router_core/router_core_private.h |   1 +
 src/router_core/terminus.c            |  50 +++++-
 src/router_node.c                     |   5 +
 5 files changed, 348 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/72678b25/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 7dc3fac..f83e4cc 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -94,8 +94,7 @@ typedef enum {
     QD_LINK_ENDPOINT,   ///< A link to a connected endpoint
     QD_LINK_WAYPOINT,   ///< A link to a configured waypoint
     QD_LINK_CONTROL,    ///< A link to a peer router for control messages
-    QD_LINK_ROUTER,     ///< A link to a peer router for routed messages
-    QD_LINK_AREA        ///< A link to a peer router in a different area (area boundary)
+    QD_LINK_ROUTER      ///< A link to a peer router for routed messages
 } qd_link_type_t;
 ENUM_DECLARE(qd_link_type);
 
@@ -233,6 +232,48 @@ void qdr_terminus_add_capability(qdr_terminus_t *term, const char *capability);
  */
 bool qdr_terminus_has_capability(qdr_terminus_t *term, const char *capability);
 
+/**
+ * qdr_terminus_is_anonymous
+ *
+ * Indicate whether this terminus represents an anonymous endpoint.
+ *
+ * @param term A qdr_terminus pointer returned by qdr_terminus()
+ * @return true iff the terminus is anonymous
+ */
+bool qdr_terminus_is_anonymous(qdr_terminus_t *term);
+
+/**
+ * qdr_terminus_is_dynamic
+ *
+ * Indicate whether this terminus represents a dynamic endpoint.
+ *
+ * @param term A qdr_terminus pointer returned by qdr_terminus()
+ * @return true iff the terminus is dynamic
+ */
+bool qdr_terminus_is_dynamic(qdr_terminus_t *term);
+
+/**
+ * qdr_terminus_get_address
+ *
+ * Return the address of the terminus in the form of an iterator.
+ * The iterator is borrowed, the caller must not free the iterator.
+ *
+ * @param term A qdr_terminus pointer returned by qdr_terminus()
+ * @return A pointer to an iterator or 0 if the terminus is anonymous.
+ */
+qd_field_iterator_t *qdr_terminus_get_address(qdr_terminus_t *term);
+
+/**
+ * qdr_terminus_dnp_address
+ *
+ * Return the address field in the dynamic-node-properties if it is there.
+ * This iterator is given, the caller must free it when it is no longer needed.
+ *
+ * @param term A qdr_terminus pointer returned by qdr_terminus()
+ * @return A pointer to an iterator or 0 if there is no such field.
+ */
+qd_field_iterator_t *qdr_terminus_dnp_address(qdr_terminus_t *term);
+
 
 /**
  ******************************************************************************

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/72678b25/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 3e8b888..5836bf0 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -19,6 +19,7 @@
 
 #include "router_core_private.h"
 #include <qpid/dispatch/amqp.h>
+#include <stdio.h>
 
 static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
@@ -29,6 +30,15 @@ static void qdr_link_detach_CT(qdr_core_t *core, qdr_action_t *action,
bool disc
 ALLOC_DEFINE(qdr_connection_t);
 ALLOC_DEFINE(qdr_connection_work_t);
 
+static qd_address_semantics_t qdr_dynamic_semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST
| QD_CONGESTION_BACKPRESSURE;
+static qd_address_semantics_t qdr_default_semantics = QD_FANOUT_SINGLE | QD_BIAS_SPREAD 
| QD_CONGESTION_BACKPRESSURE;
+
+typedef enum {
+    QDR_CONDITION_NO_ROUTE_TO_DESTINATION,
+    QDR_CONDITION_ROUTED_LINK_LOST,
+    QDR_CONDITION_FORBIDDEN
+} qdr_condition_t;
+
 //==================================================================================
 // Internal Functions
 //==================================================================================
@@ -260,6 +270,171 @@ static qdr_link_t *qdr_create_link_CT(qdr_core_t       *core,
 }
 
 
+static void qdr_link_reject_CT(qdr_core_t *core, qdr_link_t *link, qdr_condition_t condition)
+{
+}
+
+
+static void qdr_link_accept_CT(qdr_core_t *core, qdr_link_t *link)
+{
+}
+
+
+static void qdr_forward_first_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_address_t
*addr)
+{
+}
+
+
+/**
+ * Generate a temporary routable address for a destination connected to this
+ * router node.
+ */
+static void qdr_generate_temp_addr(qdr_core_t *core, char *buffer, size_t length)
+{
+    static const char *table = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+_";
+    char     discriminator[16];
+    long int rnd1 = random();
+    long int rnd2 = random();
+    long int rnd3 = random();
+    int      idx;
+    int      cursor = 0;
+
+    for (idx = 0; idx < 5; idx++) {
+        discriminator[cursor++] = table[(rnd1 >> (idx * 6)) & 63];
+        discriminator[cursor++] = table[(rnd2 >> (idx * 6)) & 63];
+        discriminator[cursor++] = table[(rnd3 >> (idx * 6)) & 63];
+    }
+    discriminator[cursor] = '\0';
+
+    snprintf(buffer, length, "amqp:/_topo/%s/%s/temp.%s", core->router_area, core->router_id,
discriminator);
+}
+
+
+static char qdr_prefix_for_dir(qd_direction_t dir)
+{
+    return (dir == QD_INCOMING) ? 'C' : 'D';
+}
+
+
+static qd_address_semantics_t qdr_semantics_for_address(qdr_core_t *core, qd_field_iterator_t
*iter)
+{
+    qdr_address_t *addr = 0;
+
+    //
+    // Question: Should we use a new prefix for configuration?
+    //
+    qd_hash_retrieve_prefix(core->addr_hash, iter, (void**) addr);
+    return addr ? addr->semantics : qdr_default_semantics;
+}
+
+
+/**
+ * qdr_lookup_terminus_address_CT
+ *
+ * Lookup a terminus address in the route table and possibly create a new address
+ * if no match is found.
+ *
+ * @param core Pointer to the core object
+ * @param dir Direction of the link for the terminus
+ * @param terminus The terminus containing the addressing information to be looked up
+ * @param create_if_not_found Iff true, return a pointer to a newly created address record
+ * @param accept_dynamic Iff true, honor the dynamic flag by creating a dynamic address
+ * @param [out] link_route True iff the lookup indicates that an attach should be routed
+ * @return Pointer to an address record or 0 if none is found
+ */
+static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t     *core,
+                                                     qd_direction_t  dir,
+                                                     qdr_terminus_t *terminus,
+                                                     bool            create_if_not_found,
+                                                     bool            accept_dynamic,
+                                                     bool           *link_route)
+{
+    qdr_address_t *addr = 0;
+
+    //
+    // Unless expressly stated, link routing is not indicated for this terminus.
+    //
+    *link_route = false;
+
+    if (qdr_terminus_is_dynamic(terminus)) {
+        //
+        // The terminus is dynamic.  Check to see if there is an address provided
+        // in the dynamic node properties.  If so, look that address up as a link-routed
+        // destination.
+        //
+        qd_field_iterator_t *dnp_address = qdr_terminus_dnp_address(terminus);
+        if (dnp_address) {
+            qd_address_iterator_override_prefix(dnp_address, qdr_prefix_for_dir(dir));
+            qd_hash_retrieve_prefix(core->addr_hash, dnp_address, (void**) &addr);
+            qd_field_iterator_free(dnp_address);
+            *link_route = true;
+            return addr;
+        }
+
+        //
+        // The dynamic terminus has no address in the dynamic-node-propteries.  If we are
+        // permitted to generate dynamic addresses, create a new address that is local to
+        // this router and insert it into the address table with a hash index.
+        //
+        if (!accept_dynamic)
+            return 0;
+
+        char temp_addr[200];
+        bool generating = true;
+        while (generating) {
+            //
+            // The address-generation process is performed in a loop in case the generated
+            // address collides with a previously generated address (this should be _highly_
+            // unlikely).
+            //
+            qdr_generate_temp_addr(core, temp_addr, 200);
+            qd_field_iterator_t *temp_iter = qd_address_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH);
+            qd_hash_retrieve(core->addr_hash, temp_iter, (void**) &addr);
+            if (!addr) {
+                addr = qdr_address(qdr_dynamic_semantics);
+                qd_hash_insert(core->addr_hash, temp_iter, addr, &addr->hash_handle);
+                DEQ_INSERT_TAIL(core->addrs, addr);
+                generating = false;
+            }
+            qd_field_iterator_free(temp_iter);
+        }
+        return addr;
+    }
+
+    //
+    // If the terminus is anonymous, there is no address to look up.
+    //
+    if (qdr_terminus_is_anonymous(terminus))
+        return 0;
+
+    //
+    // The terminus has a non-dynamic address that we need to look up.  First, look for
+    // a link-route destination for the address.
+    //
+    qd_field_iterator_t *iter = qdr_terminus_get_address(terminus);
+    qd_address_iterator_override_prefix(iter, qdr_prefix_for_dir(dir));
+    qd_hash_retrieve_prefix(core->addr_hash, iter, (void**) &addr);
+    if (addr) {
+        *link_route = true;
+        return addr;
+    }
+
+    //
+    // There was no match for a link-route destination, look for a message-route address.
+    //
+    qd_address_iterator_override_prefix(iter, '\0'); // Cancel previous override
+    qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
+    if (!addr && create_if_not_found) {
+        qd_address_semantics_t sem = qdr_semantics_for_address(core, iter);
+        addr = qdr_address(sem);
+        qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
+        DEQ_INSERT_TAIL(core->addrs, addr);
+    }
+
+    return addr;
+}
+
+
 static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
     if (discard)
@@ -335,11 +510,6 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action,
boo
 }
 
 
-static void qdr_link_reject_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
-{
-}
-
-
 static void qdr_link_first_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
     if (discard)
@@ -347,15 +517,86 @@ static void qdr_link_first_attach_CT(qdr_core_t *core, qdr_action_t
*action, boo
 
     qdr_connection_t  *conn   = action->args.connection.conn;
     qdr_link_t        *link   = action->args.connection.link;
-    //qd_direction_t     dir    = action->args.connection.dir;
+    qd_direction_t     dir    = action->args.connection.dir;
     //qdr_terminus_t    *source = action->args.connection.source;
-    //qdr_terminus_t    *target = action->args.connection.target;
+    qdr_terminus_t    *target = action->args.connection.target;
 
     //
-    // Check for some rejected-attach cases
+    // Reject any attaches of inter-router links that arrive on connections that are not
inter-router.
     //
     if ((link->link_type == QD_LINK_CONTROL || link->link_type == QD_LINK_ROUTER) &&
conn->role != QDR_ROLE_INTER_ROUTER)
-        qdr_link_reject_CT(core, conn, link);
+        qdr_link_reject_CT(core, link, QDR_CONDITION_FORBIDDEN);
+
+    //
+    // Reject any waypoint links.  Waypoint links are always initiated by a router, not the
remote container.
+    //
+    if (link->link_type == QD_LINK_WAYPOINT)
+        qdr_link_reject_CT(core, link, QDR_CONDITION_FORBIDDEN);
+
+    if (dir == QD_INCOMING) {
+        //
+        // Handle incoming link cases
+        //
+        switch (link->link_type) {
+        case QD_LINK_ENDPOINT:
+            if (qdr_terminus_is_anonymous(target)) {
+                link->addr = 0;
+                qdr_link_accept_CT(core, link);
+            } else {
+                //
+                // This link has a target address
+                //
+                bool           link_route = false;
+                qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, target, false,
false, &link_route);
+                if (!addr)
+                    //
+                    // No route to this destination, reject the link
+                    //
+                    qdr_link_reject_CT(core, link, QDR_CONDITION_NO_ROUTE_TO_DESTINATION);
+
+                else if (link_route)
+                    //
+                    // This is a link-routed destination, forward the attach to the next
hop
+                    //
+                    qdr_forward_first_attach_CT(core, link, addr);
+
+                else {
+                    //
+                    // Associate the link with the address.  With this association, it will
be unnecessary
+                    // to do an address lookup for deliveries that arrive on this link.
+                    //
+                    link->addr = addr;
+                    qdr_link_accept_CT(core, link);
+                }
+            }
+            break;
+
+        case QD_LINK_WAYPOINT:
+            // No action, waypoint links are rejected above.
+            break;
+
+        case QD_LINK_CONTROL:
+            break;
+
+        case QD_LINK_ROUTER:
+            break;
+        }
+    } else {
+        //
+        // Handle outgoing link cases
+        //
+        switch (link->link_type) {
+        case QD_LINK_ENDPOINT:
+            break;
+        case QD_LINK_WAYPOINT:
+            // No action, waypoint links are rejected above.
+            break;
+        case QD_LINK_CONTROL:
+            break;
+        case QD_LINK_ROUTER:
+            break;
+        }
+    }
 
     //
     // Cases to be handled:

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/72678b25/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index e628d73..11176a6 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -162,6 +162,7 @@ struct qdr_link_t {
     qdr_connection_t         *conn;            ///< [ref] Connection that owns this link
     qd_link_type_t            link_type;
     qd_direction_t            link_direction;
+    qdr_address_t            *addr;            ///< [ref] Associated address record
     qdr_address_t            *owning_addr;     ///< [ref] Address record that owns this
link
     //qd_waypoint_t            *waypoint;        ///< [ref] Waypoint that owns this link
     qdr_link_t               *connected_link;  ///< [ref] If this is a link-route, reference
the connected link

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/72678b25/src/router_core/terminus.c
----------------------------------------------------------------------
diff --git a/src/router_core/terminus.c b/src/router_core/terminus.c
index d78b8f9..175acf4 100644
--- a/src/router_core/terminus.c
+++ b/src/router_core/terminus.c
@@ -46,7 +46,10 @@ qdr_terminus_t *qdr_terminus(pn_terminus_t *pn)
     term->capabilities = pn_data(0);
 
     if (pn) {
-        term->address           = qdr_field(pn_terminus_get_address(pn));
+        const char *addr = pn_terminus_get_address(pn);
+        if (addr && *addr)
+            term->address = qdr_field(addr);
+
         term->durability        = pn_terminus_get_durability(pn);
         term->expiry_policy     = pn_terminus_get_expiry_policy(pn);
         term->timeout           = pn_terminus_get_timeout(pn);
@@ -119,3 +122,48 @@ bool qdr_terminus_has_capability(qdr_terminus_t *term, const char *capability)
     return false;
 }
 
+
+bool qdr_terminus_is_anonymous(qdr_terminus_t *term)
+{
+    return term == 0 || term->address == 0;
+}
+
+
+bool qdr_terminus_is_dynamic(qdr_terminus_t *term)
+{
+    return term->dynamic;
+}
+
+
+qd_field_iterator_t *qdr_terminus_get_address(qdr_terminus_t *term)
+{
+    if (qdr_terminus_is_anonymous(term))
+        return 0;
+
+    return term->address->iterator;
+}
+
+
+qd_field_iterator_t *qdr_terminus_dnp_address(qdr_terminus_t *term)
+{
+    pn_data_t *props = term->properties;
+
+    if (!props)
+        return 0;
+
+    pn_data_rewind(props);
+    if (pn_data_next(props) && pn_data_enter(props) && pn_data_next(props))
{
+        pn_bytes_t sym = pn_data_get_symbol(props);
+        if (sym.start && strcmp(QD_DYNAMIC_NODE_PROPERTY_ADDRESS, sym.start) == 0)
{
+            if (pn_data_next(props)) {
+                pn_bytes_t val = pn_data_get_string(props);
+                if (val.start && *val.start != '\0')
+                    return qd_field_iterator_string(val.start);
+            }
+        }
+    }
+
+    return 0;
+}
+
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/72678b25/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 071845b..0f2c32b 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -296,6 +296,7 @@ static int qd_router_terminus_is_router(pn_terminus_t *term)
 /**
  * If the terminus has a dynamic-node-property for a node address,
  * return an iterator for the content of that property.
+ * DEPRECATE
  */
 static const char *qd_router_terminus_dnp_address(pn_terminus_t *term)
 {
@@ -323,6 +324,7 @@ static const char *qd_router_terminus_dnp_address(pn_terminus_t *term)
 /**
  * Generate a temporary routable address for a destination connected to this
  * router node.
+ * DEPRECATE
  */
 static void qd_router_generate_temp_addr(qd_router_t *router, char *buffer, size_t length)
 {
@@ -373,6 +375,9 @@ static int qd_router_find_mask_bit_LH(qd_router_t *router, qd_link_t *link)
 }
 
 
+/**
+ * DEPRECATE
+ */
 static qd_address_t *router_lookup_terminus_LH(qd_router_t *router, const char *taddr, qd_direction_t
dir)
 {
     char                 addr_prefix = (dir == QD_INCOMING) ? 'C' : 'D';


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message