qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tr...@apache.org
Subject svn commit: r1529242 - in /qpid/trunk/qpid/extras/dispatch: python/qpid/dispatch/router/path.py python/qpid/dispatch/router/router_engine.py python/qpid/dispatch/router/routing.py src/router_node.c src/router_pynode.c tests/router_engine_test.py
Date Fri, 04 Oct 2013 17:58:55 GMT
Author: tross
Date: Fri Oct  4 17:58:54 2013
New Revision: 1529242

URL: http://svn.apache.org/r1529242
Log:
QPID-4967 - work in progress on multi-router networks
 - Added computation of valid origins for destinations
 - Modified the forwarding algorithm to ensure that only one copy of a message is sent on
   a given inter-router link
 - Added test coverage for valid origins

Modified:
    qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/path.py
    qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
    qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py
    qpid/trunk/qpid/extras/dispatch/src/router_node.c
    qpid/trunk/qpid/extras/dispatch/src/router_pynode.c
    qpid/trunk/qpid/extras/dispatch/tests/router_engine_test.py

Modified: qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/path.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/path.py?rev=1529242&r1=1529241&r2=1529242&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/path.py (original)
+++ qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/path.py Fri Oct  4 17:58:54
2013
@@ -49,7 +49,7 @@ class PathEngine(object):
     def _calculate_tree_from_root(self, root):
         ##
         ## Make a copy of the current collection of link-states that contains
-        ## an empty link-state for nodes that are known-peers but are not in the
+        ## a fake link-state for nodes that are known-peers but are not in the
         ## collection currently.  This is needed to establish routes to those nodes
         ## so we can trade link-state information with them.
         ##
@@ -58,7 +58,7 @@ class PathEngine(object):
             link_states[_id] = ls.peers
             for p in ls.peers:
                 if p not in link_states:
-                    link_states[p] = []
+                    link_states[p] = [_id]
 
         ##
         ## Setup Dijkstra's Algorithm
@@ -102,11 +102,42 @@ class PathEngine(object):
         return prev
 
 
+    def _calculate_valid_origins(self, nodeset):
+        ##
+        ## Calculate the tree from each origin, determine the set of origins-per-dest
+        ## for which the path from origin to dest passes through us.  This is the set
+        ## of valid origins for forwarding to the destination.
+        ##
+        valid_origin = {}         # Map of destination => List of Valid Origins
+        for node in nodeset:
+            if node != self.id:
+                valid_origin[node] = []
+
+        for root in valid_origin.keys():
+            prev  = self._calculate_tree_from_root(root)
+            nodes = prev.keys()
+            while len(nodes) > 0:
+                u = nodes[0]
+                path = [u]
+                nodes.remove(u)
+                v = prev[u]
+                while v != root:
+                    if v in nodes:
+                        if v != self.id:
+                            path.append(v)
+                        nodes.remove(v)
+                    if v == self.id:
+                        valid_origin[root].extend(path)
+                    u = v
+                    v = prev[u]
+        return valid_origin
+
+
     def _calculate_routes(self):
         ##
         ## Generate the shortest-path tree with the local node as root
         ##
-        prev = self._calculate_tree_from_root(self.id)
+        prev  = self._calculate_tree_from_root(self.id)
         nodes = prev.keys()
 
         ##
@@ -127,12 +158,14 @@ class PathEngine(object):
             for w in path:        # mark each node in the path as reachable via the next
hop
                 next_hops[w] = u
 
+        self.container.next_hops_changed(next_hops)
+
         ##
-        ## TODO - Calculate the tree from each origin, determine the set of origins-per-dest
-        ##        for which the path from origin to dest passes through us.  This is the
set
-        ##        of valid origins for forwarding to the destination.
+        ## Calculate the valid origins for remote routers
         ##
-        self.container.next_hops_changed(next_hops)
+        valid_origin = self._calculate_valid_origins(prev.keys())
+        self.container.valid_origins_changed(valid_origin)
+
 
 
 class NodeSet(object):

Modified: qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py?rev=1529242&r1=1529241&r2=1529242&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py (original)
+++ qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py Fri Oct 
4 17:58:54 2013
@@ -247,6 +247,10 @@ class RouterEngine:
         self.routing_table_engine.next_hops_changed(next_hop_table)
         self.binding_engine.next_hops_changed()
 
+    def valid_origins_changed(self, valid_origins):
+        self.log(LOG_DEBUG, "Event: valid_origins_changed: %r" % valid_origins)
+        self.routing_table_engine.valid_origins_changed(valid_origins)
+
     def mobile_sequence_changed(self, mobile_seq):
         self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq)
         self.link_state_engine.set_mobile_sequence(mobile_seq)

Modified: qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py?rev=1529242&r1=1529241&r2=1529242&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py (original)
+++ qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py Fri Oct  4 17:58:54
2013
@@ -48,6 +48,15 @@ class RoutingTableEngine(object):
             self.container.router_adapter.set_next_hop(mb_id, mb_nh)
 
 
+    def valid_origins_changes(self, valid_origins):
+        for _id, vo in valid_origins.items():
+            mb_id = self.node_tracker.maskbit_for_node(_id)
+            mb_vo = []
+            for o in vo:
+                mb_vo.append(self.node_tracker.maskbit_for_node(o))
+            self.container.router_adapted.set_valid_origins(mb_id, mb_vo)
+
+
     def get_next_hops(self):
         return self.next_hops
 

Modified: qpid/trunk/qpid/extras/dispatch/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_node.c?rev=1529242&r1=1529241&r2=1529242&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_node.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_node.c Fri Oct  4 17:58:54 2013
@@ -502,12 +502,34 @@ static void router_rx_handler(void* cont
                         if (origin >= 0) {
                             dx_router_ref_t  *dest_node_ref = DEQ_HEAD(addr->rnodes);
                             dx_router_link_t *dest_link;
+                            dx_bitmask_t     *link_set = dx_bitmask(0);
+
+                            //
+                            // Loop over the target nodes for this address.  Build a set
of outgoing links
+                            // for which there are valid targets.  We do this to avoid sending
more than one
+                            // message down a given link.  It's possible that there are multiple
destinations
+                            // for this address that are all reachable over the same link.
 In this case, we
+                            // will send only one copy of the message over the link and allow
a downstream
+                            // router to fan the message out.
+                            //
                             while (dest_node_ref) {
                                 if (dest_node_ref->router->next_hop)
                                     dest_link = dest_node_ref->router->next_hop->peer_link;
                                 else
                                     dest_link = dest_node_ref->router->peer_link;
-                                if (dest_link && dx_bitmask_value(dest_node_ref->router->valid_origins,
origin)) {
+                                if (dest_link && dx_bitmask_value(dest_node_ref->router->valid_origins,
origin))
+                                    dx_bitmask_set_bit(link_set, dest_link->mask_bit);
+                                dest_node_ref = DEQ_NEXT(dest_node_ref);
+                            }
+
+                            //
+                            // Send a copy of the message outbound on each identified link.
+                            //
+                            int link_bit;
+                            while (dx_bitmask_first_set(link_set, &link_bit)) {
+                                dx_bitmask_clear_bit(link_set, link_bit);
+                                dest_link = router->out_links_by_mask_bit[link_bit];
+                                if (link) {
                                     dx_routed_event_t *re = new_dx_routed_event_t();
                                     DEQ_ITEM_INIT(re);
                                     re->delivery    = 0;
@@ -522,8 +544,9 @@ static void router_rx_handler(void* cont
                                 
                                     dx_link_activate(dest_link->link);
                                 }
-                                dest_node_ref = DEQ_NEXT(dest_node_ref);
                             }
+
+                            dx_bitmask_free(link_set);
                         }
                     }
                 }

Modified: qpid/trunk/qpid/extras/dispatch/src/router_pynode.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_pynode.c?rev=1529242&r1=1529241&r2=1529242&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_pynode.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_pynode.c Fri Oct  4 17:58:54 2013
@@ -246,6 +246,61 @@ static PyObject* dx_set_next_hop(PyObjec
 }
 
 
+static PyObject* dx_set_valid_origins(PyObject *self, PyObject *args)
+{
+    RouterAdapter *adapter = (RouterAdapter*) self;
+    dx_router_t   *router  = adapter->router;
+    int            router_maskbit;
+    PyObject      *origin_list;
+    Py_ssize_t     idx;
+
+    if (!PyArg_ParseTuple(args, "iO", &router_maskbit, &origin_list))
+        return 0;
+
+    if (router_maskbit >= dx_bitmask_width() || router_maskbit < 0) {
+        PyErr_SetString(PyExc_Exception, "Router bit mask out of range");
+        return 0;
+    }
+
+    if (router->routers_by_mask_bit[router_maskbit] == 0) {
+        PyErr_SetString(PyExc_Exception, "Router Not Found");
+        return 0;
+    }
+
+    if (!PyList_Check(origin_list)) {
+        PyErr_SetString(PyExc_Exception, "Expected List as argument 2");
+        return 0;
+    }
+
+    Py_ssize_t        origin_count = PyTuple_Size(origin_list);
+    dx_router_node_t *rnode        = router->routers_by_mask_bit[router_maskbit];
+    int               maskbit;
+
+    for (idx = 0; idx < origin_count; idx++) {
+        maskbit = PyInt_AS_LONG(PyTuple_GetItem(origin_list, idx));
+
+        if (maskbit >= dx_bitmask_width() || maskbit < 0) {
+            PyErr_SetString(PyExc_Exception, "Origin bit mask out of range");
+            return 0;
+        }
+        
+        if (router->routers_by_mask_bit[maskbit] == 0) {
+            PyErr_SetString(PyExc_Exception, "Origin router Not Found");
+            return 0;
+        }
+    }
+
+    dx_bitmask_clear_all(rnode->valid_origins);
+    for (idx = 0; idx < origin_count; idx++) {
+        maskbit = PyInt_AS_LONG(PyTuple_GetItem(origin_list, idx));
+        dx_bitmask_set_bit(rnode->valid_origins, maskbit);
+    }
+
+    Py_INCREF(Py_None);
+    return Py_None;
+}
+
+
 static PyObject* dx_add_neighbor_router(PyObject *self, PyObject *args)
 {
     RouterAdapter *adapter = (RouterAdapter*) self;
@@ -326,6 +381,7 @@ static PyMethodDef RouterAdapter_methods
     {"add_remote_router",   dx_add_remote_router,   METH_VARARGS, "A new remote/reachable
router has been discovered"},
     {"del_remote_router",   dx_del_remote_router,   METH_VARARGS, "We've lost reachability
to a remote router"},
     {"set_next_hop",        dx_set_next_hop,        METH_VARARGS, "Set the next hop for a
remote router"},
+    {"set_valid_origins",   dx_set_valid_origins,   METH_VARARGS, "Set the valid origins
for a remote router"},
     {"add_neighbor_router", dx_add_neighbor_router, METH_VARARGS, "A new neighbor router
has been discovered"},
     {"del_neighbor_router", dx_del_neighbor_router, METH_VARARGS, "We've lost reachability
to a neighbor router"},
     {"map_destination",     dx_map_destination,     METH_VARARGS, "Add a newly discovered
destination mapping"},

Modified: qpid/trunk/qpid/extras/dispatch/tests/router_engine_test.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/tests/router_engine_test.py?rev=1529242&r1=1529241&r2=1529242&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/tests/router_engine_test.py (original)
+++ qpid/trunk/qpid/extras/dispatch/tests/router_engine_test.py Fri Oct  4 17:58:54 2013
@@ -344,6 +344,7 @@ class PathTest(unittest.TestCase):
         self.id = 'R1'
         self.area = 'area'
         self.next_hops = None
+        self.valid_origins = None
         self.engine = PathEngine(self)
 
     def log(self, level, text):
@@ -352,6 +353,9 @@ class PathTest(unittest.TestCase):
     def next_hops_changed(self, nh):
         self.next_hops = nh
 
+    def valid_origins_changed(self, vo):
+        self.valid_origins = vo
+
     def test_topology1(self): 
         """
 
@@ -369,6 +373,11 @@ class PathTest(unittest.TestCase):
         self.assertEqual(self.next_hops['R2'], 'R2')
         self.assertEqual(self.next_hops['R3'], 'R2')
 
+        self.valid_origins['R2'].sort()
+        self.valid_origins['R3'].sort()
+        self.assertEqual(self.valid_origins['R2'], [])
+        self.assertEqual(self.valid_origins['R3'], [])
+
     def test_topology2(self):
         """
 
@@ -396,6 +405,17 @@ class PathTest(unittest.TestCase):
         self.assertEqual(self.next_hops['R5'], 'R2')
         self.assertEqual(self.next_hops['R6'], 'R2')
 
+        self.valid_origins['R2'].sort()
+        self.valid_origins['R3'].sort()
+        self.valid_origins['R4'].sort()
+        self.valid_origins['R5'].sort()
+        self.valid_origins['R6'].sort()
+        self.assertEqual(self.valid_origins['R2'], [])
+        self.assertEqual(self.valid_origins['R3'], [])
+        self.assertEqual(self.valid_origins['R4'], [])
+        self.assertEqual(self.valid_origins['R5'], [])
+        self.assertEqual(self.valid_origins['R6'], [])
+
     def test_topology3(self):
         """
 
@@ -404,7 +424,7 @@ class PathTest(unittest.TestCase):
         +----+      +----+      +----+
                        |           |
                     +====+      +----+      +----+
-                   | R1 |------| R5 |------| R6 |
+                    | R1 |------| R5 |------| R6 |
                     +====+      +----+      +----+
 
         """
@@ -423,6 +443,17 @@ class PathTest(unittest.TestCase):
         self.assertEqual(self.next_hops['R5'], 'R5')
         self.assertEqual(self.next_hops['R6'], 'R5')
 
+        self.valid_origins['R2'].sort()
+        self.valid_origins['R3'].sort()
+        self.valid_origins['R4'].sort()
+        self.valid_origins['R5'].sort()
+        self.valid_origins['R6'].sort()
+        self.assertEqual(self.valid_origins['R2'], ['R5', 'R6'])
+        self.assertEqual(self.valid_origins['R3'], ['R5', 'R6'])
+        self.assertEqual(self.valid_origins['R4'], [])
+        self.assertEqual(self.valid_origins['R5'], ['R2', 'R3'])
+        self.assertEqual(self.valid_origins['R6'], ['R2', 'R3'])
+
     def test_topology4(self):
         """
 
@@ -451,6 +482,19 @@ class PathTest(unittest.TestCase):
         self.assertEqual(self.next_hops['R6'], 'R5')
         self.assertEqual(self.next_hops['R7'], 'R5')
 
+        self.valid_origins['R2'].sort()
+        self.valid_origins['R3'].sort()
+        self.valid_origins['R4'].sort()
+        self.valid_origins['R5'].sort()
+        self.valid_origins['R6'].sort()
+        self.valid_origins['R7'].sort()
+        self.assertEqual(self.valid_origins['R2'], ['R5', 'R6', 'R7'])
+        self.assertEqual(self.valid_origins['R3'], ['R5', 'R6', 'R7'])
+        self.assertEqual(self.valid_origins['R4'], [])
+        self.assertEqual(self.valid_origins['R5'], ['R2', 'R3'])
+        self.assertEqual(self.valid_origins['R6'], ['R2', 'R3'])
+        self.assertEqual(self.valid_origins['R7'], ['R2', 'R3'])
+
     def test_topology5(self):
         """
 
@@ -479,6 +523,19 @@ class PathTest(unittest.TestCase):
         self.assertEqual(self.next_hops['R6'], 'R5')
         self.assertEqual(self.next_hops['R7'], 'R5')
 
+        self.valid_origins['R2'].sort()
+        self.valid_origins['R3'].sort()
+        self.valid_origins['R4'].sort()
+        self.valid_origins['R5'].sort()
+        self.valid_origins['R6'].sort()
+        self.valid_origins['R7'].sort()
+        self.assertEqual(self.valid_origins['R2'], ['R5', 'R6', 'R7'])
+        self.assertEqual(self.valid_origins['R3'], ['R5', 'R6', 'R7'])
+        self.assertEqual(self.valid_origins['R4'], [])
+        self.assertEqual(self.valid_origins['R5'], ['R2', 'R3'])
+        self.assertEqual(self.valid_origins['R6'], ['R2', 'R3'])
+        self.assertEqual(self.valid_origins['R7'], ['R2', 'R3'])
+
     def test_topology5_with_asymmetry1(self):
         """
 
@@ -507,6 +564,19 @@ class PathTest(unittest.TestCase):
         self.assertEqual(self.next_hops['R6'], 'R5')
         self.assertEqual(self.next_hops['R7'], 'R5')
 
+        self.valid_origins['R2'].sort()
+        self.valid_origins['R3'].sort()
+        self.valid_origins['R4'].sort()
+        self.valid_origins['R5'].sort()
+        self.valid_origins['R6'].sort()
+        self.valid_origins['R7'].sort()
+        self.assertEqual(self.valid_origins['R2'], ['R5', 'R6', 'R7'])
+        self.assertEqual(self.valid_origins['R3'], ['R5', 'R6', 'R7'])
+        self.assertEqual(self.valid_origins['R4'], [])
+        self.assertEqual(self.valid_origins['R5'], ['R2', 'R3'])
+        self.assertEqual(self.valid_origins['R6'], ['R2', 'R3'])
+        self.assertEqual(self.valid_origins['R7'], ['R2', 'R3'])
+
     def test_topology5_with_asymmetry2(self):
         """
 
@@ -535,6 +605,19 @@ class PathTest(unittest.TestCase):
         self.assertEqual(self.next_hops['R6'], 'R5')
         self.assertEqual(self.next_hops['R7'], 'R5')
 
+        self.valid_origins['R2'].sort()
+        self.valid_origins['R3'].sort()
+        self.valid_origins['R4'].sort()
+        self.valid_origins['R5'].sort()
+        self.valid_origins['R6'].sort()
+        self.valid_origins['R7'].sort()
+        self.assertEqual(self.valid_origins['R2'], ['R5', 'R6', 'R7'])
+        self.assertEqual(self.valid_origins['R3'], ['R5', 'R6', 'R7'])
+        self.assertEqual(self.valid_origins['R4'], [])
+        self.assertEqual(self.valid_origins['R5'], ['R2', 'R3'])
+        self.assertEqual(self.valid_origins['R6'], ['R2', 'R3'])
+        self.assertEqual(self.valid_origins['R7'], ['R2', 'R3'])
+
     def test_topology5_with_asymmetry3(self):
         """
 
@@ -561,6 +644,15 @@ class PathTest(unittest.TestCase):
         self.assertEqual(self.next_hops['R4'], 'R3')
         self.assertEqual(self.next_hops['R5'], 'R5')
 
+        self.valid_origins['R2'].sort()
+        self.valid_origins['R3'].sort()
+        self.valid_origins['R4'].sort()
+        self.valid_origins['R5'].sort()
+        self.assertEqual(self.valid_origins['R2'], ['R5'])
+        self.assertEqual(self.valid_origins['R3'], ['R5'])
+        self.assertEqual(self.valid_origins['R4'], [])
+        self.assertEqual(self.valid_origins['R5'], ['R2', 'R3'])
+
 
 if __name__ == '__main__':
     unittest.main()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message