qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kp...@apache.org
Subject svn commit: r1534394 [5/22] - in /qpid/branches/linearstore/qpid: ./ cpp/ cpp/bindings/qmf2/examples/python/ cpp/bindings/qmf2/python/ cpp/bindings/qpid/dotnet/ cpp/etc/ cpp/examples/ cpp/examples/messaging/ cpp/examples/qmf-agent/ cpp/include/qpid/ cp...
Date Mon, 21 Oct 2013 22:05:07 GMT
Modified: qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py Mon Oct 21 22:04:51 2013
@@ -20,169 +20,170 @@
 from data import MessageRA, MessageMAR, MessageMAU
 
 try:
-  from dispatch import *
+    from dispatch import *
 except ImportError:
-  from ..stubs import *
+    from ..stubs import *
 
 class MobileAddressEngine(object):
-  """
-  This module is responsible for maintaining an up-to-date list of mobile addresses in the domain.
-  It runs the Mobile-Address protocol and generates an un-optimized routing table for mobile addresses.
-  Note that this routing table maps from the mobile address to the remote router where that address
-  is directly bound.
-  """
-  def __init__(self, container):
-    self.container = container
-    self.id = self.container.id
-    self.area = self.container.area
-    self.mobile_addr_max_age = self.container.config.mobile_addr_max_age
-    self.mobile_seq = 0
-    self.local_keys = []
-    self.added_keys = []
-    self.deleted_keys = []
-    self.remote_lists = {}      # map router_id => (sequence, list of keys)
-    self.remote_last_seen = {}  # map router_id => time of last seen advertizement/update
-    self.remote_changed = False
-    self.needed_mars = {}
-
-
-  def tick(self, now):
-    self._expire_remotes(now)
-    self._send_mars()
-
-    ##
-    ## If local keys have changed, collect the changes and send a MAU with the diffs
-    ## Note: it is important that the differential-MAU be sent before a RA is sent
-    ##
-    if len(self.added_keys) > 0 or len(self.deleted_keys) > 0:
-      self.mobile_seq += 1
-      self.container.send('_topo.%s.all' % self.area,
-                          MessageMAU(None, self.id, self.area, self.mobile_seq, self.added_keys, self.deleted_keys))
-      self.local_keys.extend(self.added_keys)
-      for key in self.deleted_keys:
-        self.local_keys.remove(key)
-      self.added_keys = []
-      self.deleted_keys = []
-      self.container.mobile_sequence_changed(self.mobile_seq)
-
-    ##
-    ## If remotes have changed, start the process of updating local bindings
-    ##
-    if self.remote_changed:
-      self.remote_changed = False
-      self._update_remote_keys()
-
-
-  def add_local_address(self, key):
     """
+    This module is responsible for maintaining an up-to-date list of mobile addresses in the domain.
+    It runs the Mobile-Address protocol and generates an un-optimized routing table for mobile addresses.
+    Note that this routing table maps from the mobile address to the remote router where that address
+    is directly bound.
     """
-    if self.local_keys.count(key) == 0:
-      if self.added_keys.count(key) == 0:
-        self.added_keys.append(key)
-    else:
-      if self.deleted_keys.count(key) > 0:
-        self.deleted_keys.remove(key)
+    def __init__(self, container, node_tracker):
+        self.container = container
+        self.node_tracker = node_tracker
+        self.id = self.container.id
+        self.area = self.container.area
+        self.mobile_addr_max_age = self.container.config.mobile_addr_max_age
+        self.mobile_seq = 0
+        self.local_addrs = []
+        self.added_addrs = []
+        self.deleted_addrs = []
+        self.remote_lists = {}      # map router_id => (sequence, list of addrs)
+        self.remote_last_seen = {}  # map router_id => time of last seen advertizement/update
+        self.needed_mars = {}
+
+
+    def tick(self, now):
+        self._expire_remotes(now)
+        self._send_mars()
+
+        ##
+        ## If local addrs have changed, collect the changes and send a MAU with the diffs
+        ## Note: it is important that the differential-MAU be sent before a RA is sent
+        ##
+        if len(self.added_addrs) > 0 or len(self.deleted_addrs) > 0:
+            self.mobile_seq += 1
+            self.container.send('amqp:/_topo/%s/all/qdxrouter' % self.area,
+                                MessageMAU(None, self.id, self.area, self.mobile_seq, self.added_addrs, self.deleted_addrs))
+            self.local_addrs.extend(self.added_addrs)
+            for addr in self.deleted_addrs:
+                self.local_addrs.remove(addr)
+            self.added_addrs = []
+            self.deleted_addrs = []
+            self.container.mobile_sequence_changed(self.mobile_seq)
+
+
+    def add_local_address(self, addr):
+        """
+        """
+        if self.local_addrs.count(addr) == 0:
+            if self.added_addrs.count(addr) == 0:
+                self.added_addrs.append(addr)
+        else:
+            if self.deleted_addrs.count(addr) > 0:
+                self.deleted_addrs.remove(addr)
 
 
-  def del_local_address(self, key):
-    """
-    """
-    if self.local_keys.count(key) > 0:
-      if self.deleted_keys.count(key) == 0:
-        self.deleted_keys.append(key)
-    else:
-      if self.added_keys.count(key) > 0:
-        self.added_keys.remove(key)
-
-
-  def handle_ra(self, msg, now):
-    if msg.id == self.id:
-      return
-
-    if msg.mobile_seq == 0:
-      return
-
-    if msg.id in self.remote_lists:
-      _seq, _list = self.remote_lists[msg.id]
-      self.remote_last_seen[msg.id] = now
-      if _seq < msg.mobile_seq:
-        self.needed_mars[(msg.id, msg.area, _seq)] = None
-    else:
-      self.needed_mars[(msg.id, msg.area, 0)] = None
-
-
-  def handle_mau(self, msg, now):
-    ##
-    ## If the MAU is differential, we can only use it if its sequence is exactly one greater
-    ## than our stored sequence.  If not, we will ignore the content and schedule a MAR.
-    ##
-    ## If the MAU is absolute, we can use it in all cases.
-    ##
-    if msg.id == self.id:
-      return
-
-    if msg.exist_list:
-      ##
-      ## Absolute MAU
-      ##
-      if msg.id in self.remote_lists:
-        _seq, _list = self.remote_lists[msg.id]
-        if _seq >= msg.mobile_seq:  # ignore duplicates
-          return
-      self.remote_lists[msg.id] = (msg.mobile_seq, msg.exist_list)
-      self.remote_last_seen[msg.id] = now
-      self.remote_changed = True
-    else:
-      ##
-      ## Differential MAU
-      ##
-      if msg.id in self.remote_lists:
-        _seq, _list = self.remote_lists[msg.id]
-        if _seq == msg.mobile_seq:  # ignore duplicates
-          return
-        self.remote_last_seen[msg.id] = now
-        if _seq + 1 == msg.mobile_seq:
-          ##
-          ## This is one greater than our stored value, incorporate the deltas
-          ##
-          if msg.add_list and msg.add_list.__class__ == list:
-            _list.extend(msg.add_list)
-          if msg.del_list and msg.del_list.__class__ == list:
-            for key in msg.del_list:
-              _list.remove(key)
-          self.remote_lists[msg.id] = (msg.mobile_seq, _list)
-          self.remote_changed = True
+    def del_local_address(self, addr):
+        """
+        """
+        if self.local_addrs.count(addr) > 0:
+            if self.deleted_addrs.count(addr) == 0:
+                self.deleted_addrs.append(addr)
+        else:
+            if self.added_addrs.count(addr) > 0:
+                self.added_addrs.remove(addr)
+
+
+    def handle_ra(self, msg, now):
+        if msg.id == self.id:
+            return
+
+        if msg.mobile_seq == 0:
+            return
+
+        if msg.id in self.remote_lists:
+            _seq, _list = self.remote_lists[msg.id]
+            self.remote_last_seen[msg.id] = now
+            if _seq < msg.mobile_seq:
+                self.needed_mars[(msg.id, msg.area, _seq)] = None
+        else:
+            self.needed_mars[(msg.id, msg.area, 0)] = None
+
+
+    def handle_mau(self, msg, now):
+        ##
+        ## If the MAU is differential, we can only use it if its sequence is exactly one greater
+        ## than our stored sequence.  If not, we will ignore the content and schedule a MAR.
+        ##
+        ## If the MAU is absolute, we can use it in all cases.
+        ##
+        if msg.id == self.id:
+            return
+
+        if msg.exist_list:
+            ##
+            ## Absolute MAU
+            ##
+            if msg.id in self.remote_lists:
+                _seq, _list = self.remote_lists[msg.id]
+                if _seq >= msg.mobile_seq:  # ignore duplicates
+                    return
+            self.remote_lists[msg.id] = (msg.mobile_seq, msg.exist_list)
+            self.remote_last_seen[msg.id] = now
+            (add_list, del_list) = self.node_tracker.overwrite_addresses(msg.id, msg.exist_list)
+            self._activate_remotes(msg.id, add_list, del_list)
         else:
-          self.needed_mars[(msg.id, msg.area, _seq)] = None
-      else:
-        self.needed_mars[(msg.id, msg.area, 0)] = None
-
-
-  def handle_mar(self, msg, now):
-    if msg.id == self.id:
-      return
-    if msg.have_seq < self.mobile_seq:
-      self.container.send('_topo.%s.%s' % (msg.area, msg.id),
-                          MessageMAU(None, self.id, self.area, self.mobile_seq, None, None, self.local_keys))
-
-
-  def _update_remote_keys(self):
-    keys = {}
-    for _id,(seq,key_list) in self.remote_lists.items():
-      keys[_id] = key_list
-    self.container.mobile_keys_changed(keys)
-
-
-  def _expire_remotes(self, now):
-    for _id, t in self.remote_last_seen.items():
-      if now - t > self.mobile_addr_max_age:
-        self.remote_lists.pop(_id)
-        self.remote_last_seen.pop(_id)
-        self.remote_changed = True
-
-
-  def _send_mars(self):
-    for _id, _area, _seq in self.needed_mars.keys():
-      self.container.send('_topo.%s.%s' % (_area, _id), MessageMAR(None, self.id, self.area, _seq))
-    self.needed_mars = {}
+            ##
+            ## Differential MAU
+            ##
+            if msg.id in self.remote_lists:
+                _seq, _list = self.remote_lists[msg.id]
+                if _seq == msg.mobile_seq:  # ignore duplicates
+                    return
+                self.remote_last_seen[msg.id] = now
+                if _seq + 1 == msg.mobile_seq:
+                    ##
+                    ## This is one greater than our stored value, incorporate the deltas
+                    ##
+                    if msg.add_list and msg.add_list.__class__ == list:
+                        _list.extend(msg.add_list)
+                    if msg.del_list and msg.del_list.__class__ == list:
+                        for addr in msg.del_list:
+                            _list.remove(addr)
+                    self.remote_lists[msg.id] = (msg.mobile_seq, _list)
+                    if msg.add_list:
+                        self.node_tracker.add_addresses(msg.id, msg.add_list)
+                    if msg.del_list:
+                        self.node_tracker.del_addresses(msg.id, msg.del_list)
+                    self._activate_remotes(msg.id, msg.add_list, msg.del_list)
+                else:
+                    self.needed_mars[(msg.id, msg.area, _seq)] = None
+            else:
+                self.needed_mars[(msg.id, msg.area, 0)] = None
+
+
+    def handle_mar(self, msg, now):
+        if msg.id == self.id:
+            return
+        if msg.have_seq < self.mobile_seq:
+            self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (msg.area, msg.id),
+                                MessageMAU(None, self.id, self.area, self.mobile_seq, None, None, self.local_addrs))
+
+
+    def _expire_remotes(self, now):
+        for _id, t in self.remote_last_seen.items():
+            if now - t > self.mobile_addr_max_age:
+                self.remote_lists.pop(_id)
+                self.remote_last_seen.pop(_id)
+                self.remote_changed = True
+
+
+    def _send_mars(self):
+        for _id, _area, _seq in self.needed_mars.keys():
+            self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (_area, _id), MessageMAR(None, self.id, self.area, _seq))
+        self.needed_mars = {}
+
+
+    def _activate_remotes(self, _id, added, deleted):
+        bit = self.node_tracker.maskbit_for_node(_id)
+        if added:
+            for a in added:
+                self.container.router_adapter.map_destination(a, bit)
+        if deleted:
+            for d in deleted:
+                self.container.router_adapter.unmap_destination(d, bit)
 

Modified: qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py Mon Oct 21 22:04:51 2013
@@ -21,63 +21,63 @@ from data import LinkState, MessageHELLO
 from time import time
 
 try:
-  from dispatch import *
+    from dispatch import *
 except ImportError:
-  from ..stubs import *
+    from ..stubs import *
 
 
 class NeighborEngine(object):
-  """
-  This module is responsible for maintaining this router's link-state.  It runs the HELLO protocol
-  with the router's neighbors and notifies outbound when the list of neighbors-in-good-standing (the
-  link-state) changes.
-  """
-  def __init__(self, container):
-    self.container = container
-    self.id = self.container.id
-    self.area = self.container.area
-    self.last_hello_time = 0.0
-    self.hello_interval = container.config.hello_interval
-    self.hello_max_age = container.config.hello_max_age
-    self.hellos = {}
-    self.link_state_changed = False
-    self.link_state = LinkState(None, self.id, self.area, 0, [])
-
-
-  def tick(self, now):
-    self._expire_hellos(now)
-
-    if now - self.last_hello_time >= self.hello_interval:
-      self.last_hello_time = now
-      self.container.send('_local/qdxrouter', MessageHELLO(None, self.id, self.area, self.hellos.keys()))
-
-    if self.link_state_changed:
-      self.link_state_changed = False
-      self.link_state.bump_sequence()
-      self.container.local_link_state_changed(self.link_state)
-
-
-  def handle_hello(self, msg, now):
-    if msg.id == self.id:
-      return
-    self.hellos[msg.id] = now
-    if msg.is_seen(self.id):
-      if self.link_state.add_peer(msg.id):
-        self.link_state_changed = True
-        self.container.new_neighbor(msg.id)
-        self.container.log(LOG_INFO, "New neighbor established: %s" % msg.id)
-    ##
-    ## TODO - Use this function to detect area boundaries
-    ##
-
-  def _expire_hellos(self, now):
-    to_delete = []
-    for key, last_seen in self.hellos.items():
-      if now - last_seen > self.hello_max_age:
-        to_delete.append(key)
-    for key in to_delete:
-      self.hellos.pop(key)
-      if self.link_state.del_peer(key):
-        self.link_state_changed = True
-        self.container.lost_neighbor(key)
-        self.container.log(LOG_INFO, "Neighbor lost: %s" % key)
+    """
+    This module is responsible for maintaining this router's link-state.  It runs the HELLO protocol
+    with the router's neighbors and notifies outbound when the list of neighbors-in-good-standing (the
+    link-state) changes.
+    """
+    def __init__(self, container):
+        self.container = container
+        self.id = self.container.id
+        self.area = self.container.area
+        self.last_hello_time = 0.0
+        self.hello_interval = container.config.hello_interval
+        self.hello_max_age = container.config.hello_max_age
+        self.hellos = {}
+        self.link_state_changed = False
+        self.link_state = LinkState(None, self.id, self.area, 0, [])
+
+
+    def tick(self, now):
+        self._expire_hellos(now)
+
+        if now - self.last_hello_time >= self.hello_interval:
+            self.last_hello_time = now
+            self.container.send('amqp:/_local/qdxhello', MessageHELLO(None, self.id, self.area, self.hellos.keys()))
+
+        if self.link_state_changed:
+            self.link_state_changed = False
+            self.link_state.bump_sequence()
+            self.container.local_link_state_changed(self.link_state)
+
+
+    def handle_hello(self, msg, now, link_id):
+        if msg.id == self.id:
+            return
+        self.hellos[msg.id] = now
+        if msg.is_seen(self.id):
+            if self.link_state.add_peer(msg.id):
+                self.link_state_changed = True
+                self.container.new_neighbor(msg.id, link_id)
+                self.container.log(LOG_INFO, "New neighbor established: %s on link: %d" % (msg.id, link_id))
+        ##
+        ## TODO - Use this function to detect area boundaries
+        ##
+
+    def _expire_hellos(self, now):
+        to_delete = []
+        for key, last_seen in self.hellos.items():
+            if now - last_seen > self.hello_max_age:
+                to_delete.append(key)
+        for key in to_delete:
+            self.hellos.pop(key)
+            if self.link_state.del_peer(key):
+                self.link_state_changed = True
+                self.container.lost_neighbor(key)
+                self.container.log(LOG_INFO, "Neighbor lost: %s" % key)

Modified: qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/node.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/node.py?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/node.py (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/node.py Mon Oct 21 22:04:51 2013
@@ -18,86 +18,157 @@
 #
 
 try:
-  from dispatch import *
+    from dispatch import *
 except ImportError:
-  from ..stubs import *
+    from ..stubs import *
 
 
 class NodeTracker(object):
-  """
-  This module is responsible for tracking the set of router nodes that are known to this
-  router.  It tracks whether they are neighbor or remote and whether they are reachable.
-  """
-  def __init__(self, container):
-    self.container = container
-    self.id    = self.container.id
-    self.area  = self.container.area
-    self.nodes = {}  # id => RemoteNode
-
-
-  def tick(self, now):
-    pass
-
-
-  def new_neighbor(self, node_id):
-    if node_id not in self.nodes:
-      self.nodes[node_id] = RemoteNode(node_id)
-    self.nodes[node_id].set_neighbor()
-    self._notify(self.nodes[node_id])
-
-
-  def lost_neighbor(self, node_id):
-    node = self.nodes[node_id]
-    node.clear_neighbor()
-    self._notify(node)
-    if node.to_delete():
-      self.nodes.pop(node_id)
-
-
-  def new_node(self, node_id):
-    if node_id not in self.nodes:
-      self.nodes[node_id] = RemoteNode(node_id)
-    self.nodes[node_id].set_remote()
-    self._notify(self.nodes[node_id])
-
-
-  def lost_node(self, node_id):
-    node = self.nodes[node_id]
-    node.clear_remote()
-    self._notify(node)
-    if node.to_delete():
-      self.nodes.pop(node_id)
-
-
-  def _notify(self, node):
-    if node.to_delete():
-      self.container.adapter.node_updated("R%s" % node.id, 0, 0)
-    else:
-      is_neighbor = 0
-      if node.neighbor:
-        is_neighbor = 1
-      self.container.adapter.node_updated("R%s" % node.id, 1, is_neighbor)
+    """
+    This module is responsible for tracking the set of router nodes that are known to this
+    router.  It tracks whether they are neighbor or remote and whether they are reachable.
+
+    This module is also responsible for assigning a unique mask bit value to each router.
+    The mask bit is used in the main router to represent sets of valid destinations for addresses.
+    """
+    def __init__(self, container, max_routers):
+        self.container    = container
+        self.max_routers  = max_routers
+        self.nodes        = {}  # id => RemoteNode
+        self.maskbits     = []
+        self.next_maskbit = 1   # Reserve bit '0' to represent this router
+        for i in range(max_routers):
+            self.maskbits.append(None)
+        self.maskbits[0] = True
+
+
+    def tick(self, now):
+        pass
+
+
+    def new_neighbor(self, node_id, link_maskbit):
+        """
+        A node, designated by node_id, has been discovered as a neighbor over a link with
+        a maskbit of link_maskbit.
+        """
+        if node_id in self.nodes:
+            node = self.nodes[node_id]
+            if node.neighbor:
+                return
+            self.container.del_remote_router(node.maskbit)
+            node.neighbor = True
+        else:
+            node = RemoteNode(node_id, self._allocate_maskbit(), True)
+            self.nodes[node_id] = node
+        self.container.add_neighbor_router(self._address(node_id), node.maskbit, link_maskbit)
+
+
+    def lost_neighbor(self, node_id):
+        """
+        We have lost contact with a neighboring node node_id.
+        """
+        node = self.nodes[node_id]
+        node.neighbor = False
+        self.container.del_neighbor_router(node.maskbit)
+        if node.remote:
+            self.container.add_remote_router(self._address(node.id), node.maskbit)
+        else:
+            self._free_maskbit(node.maskbit)
+            self.nodes.pop(node_id)
+
+
+    def new_node(self, node_id):
+        """
+        A node, designated by node_id, has been discovered through the an advertisement from a
+        remote peer.
+        """
+        if node_id not in self.nodes:
+            node = RemoteNode(node_id, self._allocate_maskbit(), False)
+            self.nodes[node_id] = node
+            self.container.add_remote_router(self._address(node.id), node.maskbit)
+        else:
+            node = self.nodes[node_id]
+            node.remote = True
+
+
+    def lost_node(self, node_id):
+        """
+        A remote node, node_id, has not been heard from for too long and is being deemed lost.
+        """
+        node = self.nodes[node_id]
+        if node.remote:
+            node.remote = False
+            if not node.neighbor:
+                self.container.del_remote_router(node.maskbit)
+                self._free_maskbit(node.maskbit)
+                self.nodes.pop(node_id)
+
+
+    def maskbit_for_node(self, node_id):
+        """
+        """
+        node = self.nodes[node_id]
+        if node:
+            return node.maskbit
+        return None
+
+
+    def add_addresses(self, node_id, addrs):
+        node = self.nodes[node_id]
+        for a in addrs:
+            node.addrs[a] = 1
+
+
+    def del_addresses(self, node_id, addrs):
+        node = self.nodes[node_id]
+        for a in addrs:
+            node.addrs.pop(a)
+
+
+    def overwrite_addresses(self, node_id, addrs):
+        node    = self.nodes[node_id]
+        added   = []
+        deleted = []
+        for a in addrs:
+            if a not in node.addrs.keys():
+                added.append(a)
+        for a in node.addrs.keys():
+            if a not in addrs:
+                deleted.append(a)
+        for a in addrs:
+            node.addrs[a] = 1
+        return (added, deleted)
+
+
+    def _allocate_maskbit(self):
+        if self.next_maskbit == None:
+            raise Exception("Exceeded Maximum Router Count")
+        result = self.next_maskbit
+        self.next_maskbit = None
+        self.maskbits[result] = True
+        for n in range(result + 1, self.max_routers):
+            if self.maskbits[n] == None:
+                self.next_maskbit = n
+                break
+        return result
+
+
+    def _free_maskbit(self, i):
+        self.maskbits[i] = None
+        if self.next_maskbit == None or i < self.next_maskbit:
+            self.next_maskbit = i
 
 
-class RemoteNode(object):
-
-  def __init__(self, node_id):
-    self.id       = node_id
-    self.neighbor = None
-    self.remote   = None
-
-  def set_neighbor(self):
-    self.neighbor = True
+    def _address(self, node_id):
+        return "amqp:/_topo/%s/%s" % (self.container.area, node_id)
 
-  def set_remote(self):
-    self.remote = True
 
-  def clear_neighbor(self):
-    self.neighbor = None
-
-  def clear_remote(self):
-    self.remote = None
+class RemoteNode(object):
 
-  def to_delete(self):
-    return self.neighbor or self.remote
+    def __init__(self, node_id, maskbit, neighbor):
+        self.id       = node_id
+        self.maskbit  = maskbit
+        self.neighbor = neighbor
+        self.remote   = not neighbor
+        self.addrs    = {}  # Address => Count at Node (1 only for the present)
 

Modified: qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/path.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/path.py?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/path.py (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/path.py Mon Oct 21 22:04:51 2013
@@ -18,185 +18,218 @@
 #
 
 try:
-  from dispatch import *
+    from dispatch import *
 except ImportError:
-  from ..stubs import *
+    from ..stubs import *
 
 class PathEngine(object):
-  """
-  This module is responsible for computing the next-hop for every router/area in the domain
-  based on the collection of link states that have been gathered.
-  """
-  def __init__(self, container):
-    self.container = container
-    self.id = self.container.id
-    self.area = self.container.area
-    self.recalculate = False
-    self.collection = None
-
-
-  def tick(self, now_unused):
-    if self.recalculate:
-      self.recalculate = False
-      self._calculate_routes()
-
-
-  def ls_collection_changed(self, collection):
-    self.recalculate = True
-    self.collection = collection
-
-
-  def _calculate_tree_from_root(self, root):
-    ##
-    ## Make a copy of the current collection of link-states that contains
-    ## an empty link-state for nodes that are known-peers but are not in the
-    ## collection currently.  This is needed to establish routes to those nodes
-    ## so we can trade link-state information with them.
-    ##
-    link_states = {}
-    for _id, ls in self.collection.items():
-      link_states[_id] = ls.peers
-      for p in ls.peers:
-        if p not in link_states:
-          link_states[p] = []
-
-    ##
-    ## Setup Dijkstra's Algorithm
-    ##
-    cost = {}
-    prev = {}
-    for _id in link_states:
-      cost[_id] = None  # infinite
-      prev[_id] = None  # undefined
-    cost[root] = 0   # no cost to the root node
-    unresolved = NodeSet(cost)
-
-    ##
-    ## Process unresolved nodes until lowest cost paths to all reachable nodes have been found.
-    ##
-    while not unresolved.empty():
-      u = unresolved.lowest_cost()
-      if cost[u] == None:
-        # There are no more reachable nodes in unresolved
-        break
-      for v in link_states[u]:
-        if unresolved.contains(v):
-          alt = cost[u] + 1   # TODO - Use link cost instead of 1
-          if cost[v] == None or alt < cost[v]:
-            cost[v] = alt
-            prev[v] = u
-            unresolved.set_cost(v, alt)
-
-    ##
-    ## Remove unreachable nodes from the map.  Note that this will also remove the
-    ## root node (has no previous node) from the map.
-    ##
-    for u, val in prev.items():
-      if not val:
-        prev.pop(u)
-
-    ##
-    ## Return previous-node map.  This is a map of all reachable, remote nodes to
-    ## their predecessor node.
-    ##
-    return prev
-
-
-  def _calculate_routes(self):
-    ##
-    ## Generate the shortest-path tree with the local node as root
-    ##
-    prev = self._calculate_tree_from_root(self.id)
-    nodes = prev.keys()
-
-    ##
-    ## Distill the path tree into a map of next hops for each node
-    ##
-    next_hops = {}
-    while len(nodes) > 0:
-      u = nodes[0]          # pick any destination
-      path = [u]
-      nodes.remove(u)
-      v = prev[u]
-      while v != self.id:   # build a list of nodes in the path back to the root
-        if v in nodes:
-          path.append(v)
-          nodes.remove(v)
-        u = v
-        v = prev[u]
-      for w in path:        # mark each node in the path as reachable via the next hop
-        next_hops[w] = u
-
-    ##
-    ## TODO - Calculate the tree from each origin, determine the set of origins-per-dest
-    ##        for which the path from origin to dest passes through us.  This is the set
-    ##        of valid origins for forwarding to the destination.
-    ##
+    """
+    This module is responsible for computing the next-hop for every router/area in the domain
+    based on the collection of link states that have been gathered.
+    """
+    def __init__(self, container):
+        self.container = container
+        self.id = self.container.id
+        self.area = self.container.area
+        self.recalculate = False
+        self.collection = None
 
-    self.container.next_hops_changed(next_hops)
 
+    def tick(self, now_unused):
+        if self.recalculate:
+            self.recalculate = False
+            self._calculate_routes()
+
+
+    def ls_collection_changed(self, collection):
+        self.recalculate = True
+        self.collection = collection
+
+
+    def _calculate_tree_from_root(self, root):
+        ##
+        ## Make a copy of the current collection of link-states that contains
+        ## a fake link-state for nodes that are known-peers but are not in the
+        ## collection currently.  This is needed to establish routes to those nodes
+        ## so we can trade link-state information with them.
+        ##
+        link_states = {}
+        for _id, ls in self.collection.items():
+            link_states[_id] = ls.peers
+            for p in ls.peers:
+                if p not in link_states:
+                    link_states[p] = [_id]
+
+        ##
+        ## Setup Dijkstra's Algorithm
+        ##
+        cost = {}
+        prev = {}
+        for _id in link_states:
+            cost[_id] = None  # infinite
+            prev[_id] = None  # undefined
+        cost[root] = 0   # no cost to the root node
+        unresolved = NodeSet(cost)
+
+        ##
+        ## Process unresolved nodes until lowest cost paths to all reachable nodes have been found.
+        ##
+        while not unresolved.empty():
+            u = unresolved.lowest_cost()
+            if cost[u] == None:
+                # There are no more reachable nodes in unresolved
+                break
+            for v in link_states[u]:
+                if unresolved.contains(v):
+                    alt = cost[u] + 1   # TODO - Use link cost instead of 1
+                    if cost[v] == None or alt < cost[v]:
+                        cost[v] = alt
+                        prev[v] = u
+                        unresolved.set_cost(v, alt)
 
-class NodeSet(object):
-  """
-  This data structure is an ordered list of node IDs, sorted in increasing order by their cost.
-  Equal cost nodes are secondarily sorted by their ID in order to provide deterministic and
-  repeatable ordering.
-  """
-  def __init__(self, cost_map):
-    self.nodes = []
-    for _id, cost in cost_map.items():
-      ##
-      ## Assume that nodes are either unreachable (cost = None) or local (cost = 0)
-      ## during this initialization.
-      ##
-      if cost == 0:
-        self.nodes.insert(0, (_id, cost))
-      else:
         ##
-        ## There is no need to sort unreachable nodes by ID
+        ## Remove unreachable nodes from the map.  Note that this will also remove the
+        ## root node (has no previous node) from the map.
         ##
-        self.nodes.append((_id, cost))
+        for u, val in prev.items():
+            if not val:
+                prev.pop(u)
 
+        ##
+        ## Return previous-node map.  This is a map of all reachable, remote nodes to
+        ## their predecessor node.
+        ##
+        return prev
 
-  def __repr__(self):
-    return self.nodes.__repr__()
 
+    def _calculate_valid_origins(self, nodeset):
+        ##
+        ## Calculate the tree from each origin, determine the set of origins-per-dest
+        ## for which the path from origin to dest passes through us.  This is the set
+        ## of valid origins for forwarding to the destination.
+        ##
+        valid_origin = {}         # Map of destination => List of Valid Origins
+        for node in nodeset:
+            if node != self.id:
+                valid_origin[node] = []
+
+        for root in valid_origin.keys():
+            prev  = self._calculate_tree_from_root(root)
+            nodes = prev.keys()
+            while len(nodes) > 0:
+                u = nodes[0]
+                path = [u]
+                nodes.remove(u)
+                v = prev[u]
+                while v != root:
+                    if v in nodes:
+                        if v != self.id:
+                            path.append(v)
+                        nodes.remove(v)
+                    if v == self.id:
+                        valid_origin[root].extend(path)
+                    u = v
+                    v = prev[u]
+        return valid_origin
 
-  def empty(self):
-    return len(self.nodes) == 0
 
+    def _calculate_routes(self):
+        ##
+        ## Generate the shortest-path tree with the local node as root
+        ##
+        prev  = self._calculate_tree_from_root(self.id)
+        nodes = prev.keys()
 
-  def contains(self, _id):
-    for a, b in self.nodes:
-      if a == _id:
-        return True
-    return False
+        ##
+        ## Distill the path tree into a map of next hops for each node
+        ##
+        next_hops = {}
+        while len(nodes) > 0:
+            u = nodes[0]          # pick any destination
+            path = [u]
+            nodes.remove(u)
+            v = prev[u]
+            while v != self.id:   # build a list of nodes in the path back to the root
+                if v in nodes:
+                    path.append(v)
+                    nodes.remove(v)
+                u = v
+                v = prev[u]
+            for w in path:        # mark each node in the path as reachable via the next hop
+                next_hops[w] = u
 
+        self.container.next_hops_changed(next_hops)
 
-  def lowest_cost(self):
-    """
-     Remove and return the lowest cost node ID.
-    """
-    _id, cost = self.nodes.pop(0)
-    return _id
+        ##
+        ## Calculate the valid origins for remote routers
+        ##
+        valid_origin = self._calculate_valid_origins(prev.keys())
+        self.container.valid_origins_changed(valid_origin)
 
 
-  def set_cost(self, _id, new_cost):
+
+class NodeSet(object):
     """
-    Set the cost for an ID in the NodeSet and re-insert the ID so that the list
-    remains sorted in increasing cost order.
+    This data structure is an ordered list of node IDs, sorted in increasing order by their cost.
+    Equal cost nodes are secondarily sorted by their ID in order to provide deterministic and
+    repeatable ordering.
     """
-    index = 0
-    for i, c in self.nodes:
-      if i == _id:
-        break
-      index += 1
-    self.nodes.pop(index)
-
-    index = 0
-    for i, c in self.nodes:
-      if c == None or new_cost < c or (new_cost == c and _id < i):
-        break
-      index += 1
+    def __init__(self, cost_map):
+        self.nodes = []
+        for _id, cost in cost_map.items():
+            ##
+            ## Assume that nodes are either unreachable (cost = None) or local (cost = 0)
+            ## during this initialization.
+            ##
+            if cost == 0:
+                self.nodes.insert(0, (_id, cost))
+            else:
+                ##
+                ## There is no need to sort unreachable nodes by ID
+                ##
+                self.nodes.append((_id, cost))
+
+
+    def __repr__(self):
+        return self.nodes.__repr__()
+
+
+    def empty(self):
+        return len(self.nodes) == 0
+
+
+    def contains(self, _id):
+        for a, b in self.nodes:
+            if a == _id:
+                return True
+        return False
+
+
+    def lowest_cost(self):
+        """
+        Remove and return the lowest cost node ID.
+        """
+        _id, cost = self.nodes.pop(0)
+        return _id
+
+
+    def set_cost(self, _id, new_cost):
+        """
+        Set the cost for an ID in the NodeSet and re-insert the ID so that the list
+        remains sorted in increasing cost order.
+        """
+        index = 0
+        for i, c in self.nodes:
+            if i == _id:
+                break
+            index += 1
+        self.nodes.pop(index)
+
+        index = 0
+        for i, c in self.nodes:
+            if c == None or new_cost < c or (new_cost == c and _id < i):
+                break
+            index += 1
+
+        self.nodes.insert(index, (_id, new_cost))
 
-    self.nodes.insert(index, (_id, new_cost))

Modified: qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py Mon Oct 21 22:04:51 2013
@@ -27,254 +27,267 @@ from link import LinkStateEngine
 from path import PathEngine
 from mobile import MobileAddressEngine
 from routing import RoutingTableEngine
-from binding import BindingEngine
-from adapter import AdapterEngine
 from node import NodeTracker
 
+import sys
+import traceback
+
 ##
 ## Import the Dispatch adapters from the environment.  If they are not found
 ## (i.e. we are in a test bench, etc.), load the stub versions.
 ##
 try:
-  from dispatch import *
+    from dispatch import *
 except ImportError:
-  from ..stubs import *
+    from ..stubs import *
 
 
 class RouterEngine:
-  """
-  """
-
-  def __init__(self, router_adapter, router_id=None, area='area', config_override={}):
-    """
-    Initialize an instance of a router for a domain.
-    """
-    ##
-    ## Record important information about this router instance
-    ##
-    self.domain         = "domain"
-    self.router_adapter = router_adapter
-    self.log_adapter    = LogAdapter("dispatch.router")
-    self.io_adapter     = IoAdapter(self, "qdxrouter")
-
-    if router_id:
-      self.id = router_id
-    else:
-      self.id = str(uuid4())
-    self.area = area
-    self.log(LOG_INFO, "Router Engine Instantiated: area=%s id=%s" % (self.area, self.id))
-
-    ##
-    ## Setup configuration
-    ##
-    self.config = Configuration(config_override)
-    self.log(LOG_INFO, "Config: %r" % self.config)
-
-    ##
-    ## Launch the sub-module engines
-    ##
-    self.neighbor_engine       = NeighborEngine(self)
-    self.link_state_engine     = LinkStateEngine(self)
-    self.path_engine           = PathEngine(self)
-    self.mobile_address_engine = MobileAddressEngine(self)
-    self.routing_table_engine  = RoutingTableEngine(self)
-    self.binding_engine        = BindingEngine(self)
-    self.adapter_engine        = AdapterEngine(self)
-    self.node_tracker          = NodeTracker(self)
-
-
-
-  ##========================================================================================
-  ## Adapter Entry Points - invoked from the adapter
-  ##========================================================================================
-  def getId(self):
-    """
-    Return the router's ID
-    """
-    return self.id
-
-
-  def addLocalAddress(self, key):
     """
     """
-    try:
-      if key.find('_topo') == 0 or key.find('_local') == 0:
-        return
-      self.mobile_address_engine.add_local_address(key)
-    except Exception, e:
-      self.log(LOG_ERROR, "Exception in new-address processing: exception=%r" % e)
-
-  def delLocalAddress(self, key):
-    """
-    """
-    try:
-      if key.find('_topo') == 0 or key.find('_local') == 0:
-        return
-      self.mobile_address_engine.del_local_address(key)
-    except Exception, e:
-      self.log(LOG_ERROR, "Exception in del-address processing: exception=%r" % e)
-
-
-  def handleTimerTick(self):
-    """
-    """
-    try:
-      now = time()
-      self.neighbor_engine.tick(now)
-      self.link_state_engine.tick(now)
-      self.path_engine.tick(now)
-      self.mobile_address_engine.tick(now)
-      self.routing_table_engine.tick(now)
-      self.binding_engine.tick(now)
-      self.adapter_engine.tick(now)
-      self.node_tracker.tick(now)
-    except Exception, e:
-      self.log(LOG_ERROR, "Exception in timer processing: exception=%r" % e)
-
-
-  def handleControlMessage(self, opcode, body):
-    """
-    """
-    try:
-      now = time()
-      if   opcode == 'HELLO':
-        msg = MessageHELLO(body)
-        self.log(LOG_TRACE, "RCVD: %r" % msg)
-        self.neighbor_engine.handle_hello(msg, now)
-
-      elif opcode == 'RA':
-        msg = MessageRA(body)
-        self.log(LOG_TRACE, "RCVD: %r" % msg)
-        self.link_state_engine.handle_ra(msg, now)
-        self.mobile_address_engine.handle_ra(msg, now)
-
-      elif opcode == 'LSU':
-        msg = MessageLSU(body)
-        self.log(LOG_TRACE, "RCVD: %r" % msg)
-        self.link_state_engine.handle_lsu(msg, now)
-
-      elif opcode == 'LSR':
-        msg = MessageLSR(body)
-        self.log(LOG_TRACE, "RCVD: %r" % msg)
-        self.link_state_engine.handle_lsr(msg, now)
-
-      elif opcode == 'MAU':
-        msg = MessageMAU(body)
-        self.log(LOG_TRACE, "RCVD: %r" % msg)
-        self.mobile_address_engine.handle_mau(msg, now)
-
-      elif opcode == 'MAR':
-        msg = MessageMAR(body)
-        self.log(LOG_TRACE, "RCVD: %r" % msg)
-        self.mobile_address_engine.handle_mar(msg, now)
-
-    except Exception, e:
-      self.log(LOG_ERROR, "Exception in message processing: opcode=%s body=%r exception=%r" % (opcode, body, e))
-
-
-  def receive(self, message_properties, body):
-    """
-    This is the IoAdapter message-receive handler
-    """
-    try:
-      self.handleControlMessage(message_properties['opcode'], body)
-    except Exception, e:
-      self.log(LOG_ERROR, "Exception in raw message processing: properties=%r body=%r exception=%r" %
-               (message_properties, body, e))
-
-  def getRouterData(self, kind):
-    """
-    """
-    if kind == 'help':
-      return { 'help'           : "Get list of supported values for kind",
-               'link-state'     : "This router's link state",
-               'link-state-set' : "The set of link states from known routers",
-               'next-hops'      : "Next hops to each known router",
-               'topo-table'     : "Topological routing table",
-               'mobile-table'   : "Mobile key routing table"
-             }
-    if kind == 'link-state'     : return self.neighbor_engine.link_state.to_dict()
-    if kind == 'next-hops'      : return self.routing_table_engine.next_hops
-    if kind == 'topo-table'     : return {'table': self.adapter_engine.key_classes['topological']}
-    if kind == 'mobile-table'   : return {'table': self.adapter_engine.key_classes['mobile-key']}
-    if kind == 'link-state-set' :
-      copy = {}
-      for _id,_ls in self.link_state_engine.collection.items():
-        copy[_id] = _ls.to_dict()
-      return copy
-
-    return {'notice':'Use kind="help" to get a list of possibilities'}
-
-
-  ##========================================================================================
-  ## Adapter Calls - outbound calls to Dispatch
-  ##========================================================================================
-  def log(self, level, text):
-    """
-    Emit a log message to the host's event log
-    """
-    self.log_adapter.log(level, text)
-
-
-  def send(self, dest, msg):
-    """
-    Send a control message to another router.
-    """
-    app_props = {'opcode' : msg.get_opcode() }
-    self.io_adapter.send(dest, app_props, msg.to_dict())
-    self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest))
-
-
-  def node_updated(self, addr, reachable, neighbor):
-    """
-    """
-    self.router_adapter(addr, reachable, neighbor)
-
 
-  ##========================================================================================
-  ## Interconnect between the Sub-Modules
-  ##========================================================================================
-  def local_link_state_changed(self, link_state):
-    self.log(LOG_DEBUG, "Event: local_link_state_changed: %r" % link_state)
-    self.link_state_engine.new_local_link_state(link_state)
-
-  def ls_collection_changed(self, collection):
-    self.log(LOG_DEBUG, "Event: ls_collection_changed: %r" % collection)
-    self.path_engine.ls_collection_changed(collection)
-
-  def next_hops_changed(self, next_hop_table):
-    self.log(LOG_DEBUG, "Event: next_hops_changed: %r" % next_hop_table)
-    self.routing_table_engine.next_hops_changed(next_hop_table)
-    self.binding_engine.next_hops_changed()
-
-  def mobile_sequence_changed(self, mobile_seq):
-    self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq)
-    self.link_state_engine.set_mobile_sequence(mobile_seq)
-
-  def mobile_keys_changed(self, keys):
-    self.log(LOG_DEBUG, "Event: mobile_keys_changed: %r" % keys)
-    self.binding_engine.mobile_keys_changed(keys)
-
-  def get_next_hops(self):
-    return self.routing_table_engine.get_next_hops()
-
-  def remote_routes_changed(self, key_class, routes):
-    self.log(LOG_DEBUG, "Event: remote_routes_changed: class=%s routes=%r" % (key_class, routes))
-    self.adapter_engine.remote_routes_changed(key_class, routes)
-
-  def new_neighbor(self, rid):
-    self.log(LOG_DEBUG, "Event: new_neighbor: id=%s" % rid)
-    self.node_tracker.new_neighbor(rid)
-
-  def lost_neighbor(self, rid):
-    self.log(LOG_DEBUG, "Event: lost_neighbor: id=%s" % rid)
-    self.node_tracker.lost_neighbor(rid)
-
-  def new_node(self, rid):
-    self.log(LOG_DEBUG, "Event: new_node: id=%s" % rid)
-    self.node_tracker.new_node(rid)
-
-  def lost_node(self, rid):
-    self.log(LOG_DEBUG, "Event: lost_node: id=%s" % rid)
-    self.node_tracker.lost_node(rid)
+    def __init__(self, router_adapter, router_id, area, max_routers, config_override={}):
+        """
+        Initialize an instance of a router for a domain.
+        """
+        ##
+        ## Record important information about this router instance
+        ##
+        self.domain         = "domain"
+        self.router_adapter = router_adapter
+        self.log_adapter    = LogAdapter("dispatch.router")
+        self.io_adapter     = IoAdapter(self, ("qdxrouter", "qdxhello"))
+        self.max_routers    = max_routers
+        self.id             = router_id
+        self.area           = area
+        self.log(LOG_INFO, "Router Engine Instantiated: area=%s id=%s max_routers=%d" %
+                 (self.area, self.id, self.max_routers))
+
+        ##
+        ## Setup configuration
+        ##
+        self.config = Configuration(config_override)
+        self.log(LOG_INFO, "Config: %r" % self.config)
+
+        ##
+        ## Launch the sub-module engines
+        ##
+        self.node_tracker          = NodeTracker(self, self.max_routers)
+        self.neighbor_engine       = NeighborEngine(self)
+        self.link_state_engine     = LinkStateEngine(self)
+        self.path_engine           = PathEngine(self)
+        self.mobile_address_engine = MobileAddressEngine(self, self.node_tracker)
+        self.routing_table_engine  = RoutingTableEngine(self, self.node_tracker)
+
+
+
+    ##========================================================================================
+    ## Adapter Entry Points - invoked from the adapter
+    ##========================================================================================
+    def getId(self):
+        """
+        Return the router's ID
+        """
+        return self.id
+
+
+    def addressAdded(self, addr):
+        """
+        """
+        try:
+            if addr.find('Mtemp.') == 0:
+                return
+            if addr.find('M') == 0:
+                self.mobile_address_engine.add_local_address(addr[1:])
+        except Exception, e:
+            self.log(LOG_ERROR, "Exception in new-address processing: exception=%r" % e)
+
+
+    def addressRemoved(self, addr):
+        """
+        """
+        try:
+            if addr.find('Mtemp.') == 0:
+                return
+            if addr.find('M') == 0:
+                self.mobile_address_engine.del_local_address(addr[1:])
+        except Exception, e:
+            self.log(LOG_ERROR, "Exception in del-address processing: exception=%r" % e)
+
+
+    def handleTimerTick(self):
+        """
+        """
+        try:
+            now = time()
+            self.neighbor_engine.tick(now)
+            self.link_state_engine.tick(now)
+            self.path_engine.tick(now)
+            self.mobile_address_engine.tick(now)
+            self.routing_table_engine.tick(now)
+            self.node_tracker.tick(now)
+        except Exception, e:
+            self.log(LOG_ERROR, "Exception in timer processing: exception=%r" % e)
+
+
+    def handleControlMessage(self, opcode, body, link_id):
+        """
+        """
+        try:
+            now = time()
+            if   opcode == 'HELLO':
+                msg = MessageHELLO(body)
+                self.log(LOG_TRACE, "RCVD: %r" % msg)
+                self.neighbor_engine.handle_hello(msg, now, link_id)
+
+            elif opcode == 'RA':
+                msg = MessageRA(body)
+                self.log(LOG_DEBUG, "RCVD: %r" % msg)
+                self.link_state_engine.handle_ra(msg, now)
+                self.mobile_address_engine.handle_ra(msg, now)
+
+            elif opcode == 'LSU':
+                msg = MessageLSU(body)
+                self.log(LOG_DEBUG, "RCVD: %r" % msg)
+                self.link_state_engine.handle_lsu(msg, now)
+
+            elif opcode == 'LSR':
+                msg = MessageLSR(body)
+                self.log(LOG_DEBUG, "RCVD: %r" % msg)
+                self.link_state_engine.handle_lsr(msg, now)
+
+            elif opcode == 'MAU':
+                msg = MessageMAU(body)
+                self.log(LOG_DEBUG, "RCVD: %r" % msg)
+                self.mobile_address_engine.handle_mau(msg, now)
+
+            elif opcode == 'MAR':
+                msg = MessageMAR(body)
+                self.log(LOG_DEBUG, "RCVD: %r" % msg)
+                self.mobile_address_engine.handle_mar(msg, now)
+
+        except Exception, e:
+            self.log(LOG_ERROR, "Exception in message processing: opcode=%s body=%r exception=%r" % (opcode, body, e))
+            exc_type, exc_value, exc_traceback = sys.exc_info()
+            traceback.print_tb(exc_traceback)
+
+
+    def receive(self, message_properties, body, link_id):
+        """
+        This is the IoAdapter message-receive handler
+        """
+        try:
+            #self.log(LOG_DEBUG, "Raw Receive: mp=%r body=%r link_id=%r" % (message_properties, body, link_id))
+            self.handleControlMessage(message_properties['opcode'], body, link_id)
+        except Exception, e:
+            self.log(LOG_ERROR, "Exception in raw message processing: properties=%r body=%r exception=%r" %
+                     (message_properties, body, e))
+
+
+    def getRouterData(self, kind):
+        """
+        """
+        if kind == 'help':
+            return { 'help'           : "Get list of supported values for kind",
+                     'link-state'     : "This router's link state",
+                     'link-state-set' : "The set of link states from known routers",
+                     'next-hops'      : "Next hops to each known router"
+                     }
+        if kind == 'link-state'     : return self.neighbor_engine.link_state.to_dict()
+        if kind == 'next-hops'      : return self.routing_table_engine.next_hops
+        if kind == 'link-state-set' :
+            copy = {}
+            for _id,_ls in self.link_state_engine.collection.items():
+                copy[_id] = _ls.to_dict()
+            return copy
+
+        return {'notice':'Use kind="help" to get a list of possibilities'}
+
+
+    ##========================================================================================
+    ## Adapter Calls - outbound calls to Dispatch
+    ##========================================================================================
+    def log(self, level, text):
+        """
+        Emit a log message to the host's event log
+        """
+        self.log_adapter.log(level, text)
+
+
+    def send(self, dest, msg):
+        """
+        Send a control message to another router.
+        """
+        app_props = {'opcode' : msg.get_opcode() }
+        self.io_adapter.send(dest, app_props, msg.to_dict())
+        if "qdxhello" in dest:
+            self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest))
+        else:
+            self.log(LOG_DEBUG, "SENT: %r dest=%s" % (msg, dest))
+
+
+    def node_updated(self, addr, reachable, neighbor):
+        """
+        """
+        self.router_adapter(addr, reachable, neighbor)
+
+
+    ##========================================================================================
+    ## Interconnect between the Sub-Modules
+    ##========================================================================================
+    def local_link_state_changed(self, link_state):
+        self.log(LOG_DEBUG, "Event: local_link_state_changed: %r" % link_state)
+        self.link_state_engine.new_local_link_state(link_state)
+
+    def ls_collection_changed(self, collection):
+        self.log(LOG_DEBUG, "Event: ls_collection_changed: %r" % collection)
+        self.path_engine.ls_collection_changed(collection)
+
+    def next_hops_changed(self, next_hop_table):
+        self.log(LOG_DEBUG, "Event: next_hops_changed: %r" % next_hop_table)
+        self.routing_table_engine.next_hops_changed(next_hop_table)
+
+    def valid_origins_changed(self, valid_origins):
+        self.log(LOG_DEBUG, "Event: valid_origins_changed: %r" % valid_origins)
+        self.routing_table_engine.valid_origins_changed(valid_origins)
+
+    def mobile_sequence_changed(self, mobile_seq):
+        self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq)
+        self.link_state_engine.set_mobile_sequence(mobile_seq)
+
+    def get_next_hops(self):
+        return self.routing_table_engine.get_next_hops()
+
+    def new_neighbor(self, rid, link_id):
+        self.log(LOG_DEBUG, "Event: new_neighbor: id=%s link_id=%d" % (rid, link_id))
+        self.node_tracker.new_neighbor(rid, link_id)
+
+    def lost_neighbor(self, rid):
+        self.log(LOG_DEBUG, "Event: lost_neighbor: id=%s" % rid)
+        self.node_tracker.lost_neighbor(rid)
+
+    def new_node(self, rid):
+        self.log(LOG_DEBUG, "Event: new_node: id=%s" % rid)
+        self.node_tracker.new_node(rid)
+
+    def lost_node(self, rid):
+        self.log(LOG_DEBUG, "Event: lost_node: id=%s" % rid)
+        self.node_tracker.lost_node(rid)
+
+    def add_neighbor_router(self, address, router_bit, link_bit):
+        self.log(LOG_DEBUG, "Event: add_neighbor_router: address=%s, router_bit=%d, link_bit=%d" % \
+                     (address, router_bit, link_bit))
+        self.router_adapter.add_neighbor_router(address, router_bit, link_bit)
+
+    def del_neighbor_router(self, router_bit):
+        self.log(LOG_DEBUG, "Event: del_neighbor_router: router_bit=%d" % router_bit)
+        self.router_adapter.del_neighbor_router(router_bit)
+
+    def add_remote_router(self, address, router_bit):
+        self.log(LOG_DEBUG, "Event: add_remote_router: address=%s, router_bit=%d" % (address, router_bit))
+        self.router_adapter.add_remote_router(address, router_bit)
+
+    def del_remote_router(self, router_bit):
+        self.log(LOG_DEBUG, "Event: del_remote_router: router_bit=%d" % router_bit)
+        self.router_adapter.del_remote_router(router_bit)
 

Modified: qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py Mon Oct 21 22:04:51 2013
@@ -18,39 +18,45 @@
 #
 
 try:
-  from dispatch import *
+    from dispatch import *
 except ImportError:
-  from ..stubs import *
+    from ..stubs import *
 
 class RoutingTableEngine(object):
-  """
-  This module is responsible for converting the set of next hops to remote routers to a routing
-  table in the "topological" address class.
-  """
-  def __init__(self, container):
-    self.container = container
-    self.id = self.container.id
-    self.area = self.container.area
-    self.next_hops = {}
-
-
-  def tick(self, now):
-    pass
-
-
-  def next_hops_changed(self, next_hops):
-    # Convert next_hops into routing table
-    self.next_hops = next_hops
-    new_table = []
-    for _id, next_hop in next_hops.items():
-      new_table.append(('_topo.%s.%s.#'  % (self.area, _id), next_hop))
-      pair = ('_topo.%s.all' % (self.area), next_hop)
-      if new_table.count(pair) == 0:
-        new_table.append(pair)
+    """
+    This module is responsible for converting the set of next hops to remote routers to a routing
+    table in the "topological" address class.
+    """
+    def __init__(self, container, node_tracker):
+        self.container = container
+        self.node_tracker = node_tracker
+        self.id = self.container.id
+        self.area = self.container.area
+        self.next_hops = {}
+
+
+    def tick(self, now):
+        pass
+
+
+    def next_hops_changed(self, next_hops):
+        # Convert next_hops into routing table
+        self.next_hops = next_hops
+        for _id, next_hop in next_hops.items():
+            mb_id = self.node_tracker.maskbit_for_node(_id)
+            mb_nh = self.node_tracker.maskbit_for_node(next_hop)
+            self.container.router_adapter.set_next_hop(mb_id, mb_nh)
+
+
+    def valid_origins_changed(self, valid_origins):
+        for _id, vo in valid_origins.items():
+            mb_id = self.node_tracker.maskbit_for_node(_id)
+            mb_vo = []
+            for o in vo:
+                mb_vo.append(self.node_tracker.maskbit_for_node(o))
+            self.container.router_adapter.set_valid_origins(mb_id, mb_vo)
 
-    self.container.remote_routes_changed('topological', new_table)
 
-
-  def get_next_hops(self):
-    return self.next_hops
+    def get_next_hops(self):
+        return self.next_hops
 

Modified: qpid/branches/linearstore/qpid/extras/dispatch/router/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/router/CMakeLists.txt?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/router/CMakeLists.txt (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/router/CMakeLists.txt Mon Oct 21 22:04:51 2013
@@ -18,7 +18,7 @@
 ##
 
 
-set(DEFAULT_CONFIG_PATH "/etc/qpid-dispatch.conf" CACHE string "Default Config File Path")
+set(DEFAULT_CONFIG_PATH "/etc/qpid/qpid-dispatch.conf" CACHE string "Default Config File Path")
 
 configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/config.h.in ${CMAKE_CURRENT_BINARY_DIR}/config.h)
 

Modified: qpid/branches/linearstore/qpid/extras/dispatch/router/src/main.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/router/src/main.c?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/router/src/main.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/router/src/main.c Mon Oct 21 22:04:51 2013
@@ -117,7 +117,7 @@ int main(int argc, char **argv)
         }
     }
 
-    dx_log_set_mask(0xFFFFFFFF);
+    dx_log_set_mask(0xFFFFFFFE);
 
     dispatch = dx_dispatch(config_path);
 

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/agent.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/agent.c?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/agent.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/agent.c Mon Oct 21 22:04:51 2013
@@ -37,7 +37,7 @@
 
 struct dx_agent_t {
     dx_dispatch_t     *dx;
-    hash_t            *class_hash;
+    dx_hash_t         *class_hash;
     dx_message_list_t  in_fifo;
     dx_message_list_t  out_fifo;
     sys_mutex_t       *lock;
@@ -71,7 +71,7 @@ static void dx_agent_process_get(dx_agen
 
     dx_field_iterator_t    *cls_string = dx_parse_raw(cls);
     const dx_agent_class_t *cls_record;
-    hash_retrieve_const(agent->class_hash, cls_string, (const void**) &cls_record);
+    dx_hash_retrieve_const(agent->class_hash, cls_string, (const void**) &cls_record);
     if (cls_record == 0)
         return;
 
@@ -138,11 +138,11 @@ static void dx_agent_process_get(dx_agen
     //
     // Create a message and send it.
     //
-    dx_message_t *msg = dx_allocate_message();
+    dx_message_t *msg = dx_message();
     dx_message_compose_2(msg, field);
     dx_router_send(agent->dx, reply_to, msg);
 
-    dx_free_message(msg);
+    dx_message_free(msg);
     dx_compose_free(field);
 }
 
@@ -234,13 +234,13 @@ static void dx_agent_deferred_handler(vo
 
         if (msg) {
             dx_agent_process_request(agent, msg);
-            dx_free_message(msg);
+            dx_message_free(msg);
         }
     } while (msg);
 }
 
 
-static void dx_agent_rx_handler(void *context, dx_message_t *msg)
+static void dx_agent_rx_handler(void *context, dx_message_t *msg, int unused_link_id)
 {
     dx_agent_t   *agent = (dx_agent_t*) context;
     dx_message_t *copy  = dx_message_copy(msg);
@@ -257,12 +257,12 @@ dx_agent_t *dx_agent(dx_dispatch_t *dx)
 {
     dx_agent_t *agent = NEW(dx_agent_t);
     agent->dx         = dx;
-    agent->class_hash = hash(6, 10, 1);
+    agent->class_hash = dx_hash(6, 10, 1);
     DEQ_INIT(agent->in_fifo);
     DEQ_INIT(agent->out_fifo);
     agent->lock    = sys_mutex();
     agent->timer   = dx_timer(dx, dx_agent_deferred_handler, agent);
-    agent->address = dx_router_register_address(dx, "agent", dx_agent_rx_handler, agent);
+    agent->address = dx_router_register_address(dx, "$management", dx_agent_rx_handler, agent);
 
     return agent;
 }
@@ -273,7 +273,7 @@ void dx_agent_free(dx_agent_t *agent)
     dx_router_unregister_address(agent->address);
     sys_mutex_free(agent->lock);
     dx_timer_free(agent->timer);
-    hash_free(agent->class_hash);
+    dx_hash_free(agent->class_hash);
     free(agent);
 }
 
@@ -295,7 +295,7 @@ dx_agent_class_t *dx_agent_register_clas
     cls->query_handler  = query_handler;
 
     dx_field_iterator_t *iter = dx_field_iterator_string(fqname, ITER_VIEW_ALL);
-    int result = hash_insert_const(agent->class_hash, iter, cls);
+    int result = dx_hash_insert_const(agent->class_hash, iter, cls, 0);
     dx_field_iterator_free(iter);
     if (result < 0)
         assert(false);

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/buffer.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/buffer.c?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/buffer.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/buffer.c Mon Oct 21 22:04:51 2013
@@ -34,7 +34,7 @@ void dx_buffer_set_size(size_t size)
 }
 
 
-dx_buffer_t *dx_allocate_buffer(void)
+dx_buffer_t *dx_buffer(void)
 {
     size_locked = 1;
     dx_buffer_t *buf = new_dx_buffer_t();
@@ -45,7 +45,7 @@ dx_buffer_t *dx_allocate_buffer(void)
 }
 
 
-void dx_free_buffer(dx_buffer_t *buf)
+void dx_buffer_free(dx_buffer_t *buf)
 {
     free_dx_buffer_t(buf);
 }

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/compose.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/compose.c?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/compose.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/compose.c Mon Oct 21 22:04:51 2013
@@ -43,7 +43,7 @@ static void dx_insert(dx_composed_field_
 
     while (len > 0) {
         if (buf == 0 || dx_buffer_capacity(buf) == 0) {
-            buf = dx_allocate_buffer();
+            buf = dx_buffer();
             if (buf == 0)
                 return;
             DEQ_INSERT_TAIL(field->buffers, buf);
@@ -115,8 +115,8 @@ static void dx_overwrite_32(dx_field_loc
     size_t       cursor = field->offset;
 
     dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0xFF000000) >> 24));
-    dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x00FF0000) >> 24));
-    dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x0000FF00) >> 24));
+    dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x00FF0000) >> 16));
+    dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x0000FF00) >> 8));
     dx_overwrite(&buf, &cursor, (uint8_t)  (value & 0x000000FF));
 }
 
@@ -212,7 +212,7 @@ void dx_compose_free(dx_composed_field_t
     dx_buffer_t *buf = DEQ_HEAD(field->buffers);
     while (buf) {
         DEQ_REMOVE_HEAD(field->buffers);
-        dx_free_buffer(buf);
+        dx_buffer_free(buf);
         buf = DEQ_HEAD(field->buffers);
     }
 

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/container.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/container.c?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/container.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/container.c Mon Oct 21 22:04:51 2013
@@ -88,8 +88,8 @@ typedef struct container_class_t {
 struct dx_container_t {
     dx_dispatch_t        *dx;
     dx_server_t          *server;
-    hash_t               *node_type_map;
-    hash_t               *node_map;
+    dx_hash_t            *node_type_map;
+    dx_hash_t            *node_map;
     sys_mutex_t          *lock;
     dx_node_t            *default_node;
     dxc_node_type_list_t  node_type_list;
@@ -108,7 +108,7 @@ static void setup_outgoing_link(dx_conta
 
     if (source) {
         iter   = dx_field_iterator_string(source, ITER_VIEW_NODE_ID);
-        hash_retrieve(container->node_map, iter, (void*) &node);
+        dx_hash_retrieve(container->node_map, iter, (void*) &node);
         dx_field_iterator_free(iter);
     }
     sys_mutex_unlock(container->lock);
@@ -149,7 +149,7 @@ static void setup_incoming_link(dx_conta
 
     if (target) {
         iter   = dx_field_iterator_string(target, ITER_VIEW_NODE_ID);
-        hash_retrieve(container->node_map, iter, (void*) &node);
+        dx_hash_retrieve(container->node_map, iter, (void*) &node);
         dx_field_iterator_free(iter);
     }
     sys_mutex_unlock(container->lock);
@@ -429,8 +429,8 @@ static void container_query_handler(void
     container_class_t *cls = (container_class_t*) context;
 
     if (cls->class_id == DX_CONTAINER_CLASS_CONTAINER) {
-        dx_agent_value_uint(correlator, "node_type_count", hash_size(cls->container->node_type_map));
-        dx_agent_value_uint(correlator, "node_count",      hash_size(cls->container->node_map));
+        dx_agent_value_uint(correlator, "node_type_count", dx_hash_size(cls->container->node_type_map));
+        dx_agent_value_uint(correlator, "node_count",      dx_hash_size(cls->container->node_map));
         if (cls->container->default_node)
             dx_agent_value_string(correlator, "default_node_type", cls->container->default_node->ntype->type_name);
         else
@@ -463,8 +463,8 @@ dx_container_t *dx_container(dx_dispatch
 
     container->dx            = dx;
     container->server        = dx->server;
-    container->node_type_map = hash(6,  4, 1);  // 64 buckets, item batches of 4
-    container->node_map      = hash(10, 32, 0); // 1K buckets, item batches of 32
+    container->node_type_map = dx_hash(6,  4, 1);  // 64 buckets, item batches of 4
+    container->node_map      = dx_hash(10, 32, 0); // 1K buckets, item batches of 32
     container->lock          = sys_mutex();
     container->default_node  = 0;
     DEQ_INIT(container->node_type_list);
@@ -507,7 +507,7 @@ int dx_container_register_node_type(dx_d
     nt_item->ntype = nt;
 
     sys_mutex_lock(container->lock);
-    result = hash_insert_const(container->node_type_map, iter, nt);
+    result = dx_hash_insert_const(container->node_type_map, iter, nt, 0);
     DEQ_INSERT_TAIL(container->node_type_list, nt_item);
     sys_mutex_unlock(container->lock);
 
@@ -565,7 +565,7 @@ dx_node_t *dx_container_create_node(dx_d
     if (name) {
         dx_field_iterator_t *iter = dx_field_iterator_string(name, ITER_VIEW_ALL);
         sys_mutex_lock(container->lock);
-        result = hash_insert(container->node_map, iter, node);
+        result = dx_hash_insert(container->node_map, iter, node, 0);
         sys_mutex_unlock(container->lock);
         dx_field_iterator_free(iter);
         if (result < 0) {
@@ -591,7 +591,7 @@ void dx_container_destroy_node(dx_node_t
     if (node->name) {
         dx_field_iterator_t *iter = dx_field_iterator_string(node->name, ITER_VIEW_ALL);
         sys_mutex_lock(container->lock);
-        hash_remove(container->node_map, iter);
+        dx_hash_remove(container->node_map, iter);
         sys_mutex_unlock(container->lock);
         dx_field_iterator_free(iter);
         free(node->name);
@@ -651,12 +651,63 @@ void *dx_link_get_context(dx_link_t *lin
 }
 
 
+void dx_link_set_conn_context(dx_link_t *link, void *context)
+{
+    pn_session_t *pn_sess = pn_link_session(link->pn_link);
+    if (!pn_sess)
+        return;
+    pn_connection_t *pn_conn = pn_session_connection(pn_sess);
+    if (!pn_conn)
+        return;
+    dx_connection_t *conn = (dx_connection_t*) pn_connection_get_context(pn_conn);
+    if (!conn)
+        return;
+    dx_connection_set_link_context(conn, context);
+}
+
+
+void *dx_link_get_conn_context(dx_link_t *link)
+{
+    pn_session_t *pn_sess = pn_link_session(link->pn_link);
+    if (!pn_sess)
+        return 0;
+    pn_connection_t *pn_conn = pn_session_connection(pn_sess);
+    if (!pn_conn)
+        return 0;
+    dx_connection_t *conn = (dx_connection_t*) pn_connection_get_context(pn_conn);
+    if (!conn)
+        return 0;
+    return dx_connection_get_link_context(conn);
+}
+
+
 pn_link_t *dx_link_pn(dx_link_t *link)
 {
     return link->pn_link;
 }
 
 
+dx_connection_t *dx_link_connection(dx_link_t *link)
+{
+    if (!link || !link->pn_link)
+        return 0;
+
+    pn_session_t *sess = pn_link_session(link->pn_link);
+    if (!sess)
+        return 0;
+
+    pn_connection_t *conn = pn_session_connection(sess);
+    if (!conn)
+        return 0;
+
+    dx_connection_t *ctx = pn_connection_get_context(conn);
+    if (!ctx)
+        return 0;
+
+    return ctx;
+}
+
+
 pn_terminus_t *dx_link_source(dx_link_t *link)
 {
     return pn_link_source(link->pn_link);

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/dispatch.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/dispatch.c?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/dispatch.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/dispatch.c Mon Oct 21 22:04:51 2013
@@ -24,6 +24,7 @@
 #include "dispatch_private.h"
 #include "alloc_private.h"
 #include "log_private.h"
+#include "router_private.h"
 
 /**
  * Private Function Prototypes
@@ -34,8 +35,8 @@ void            dx_server_free(dx_server
 dx_container_t *dx_container(dx_dispatch_t *dx);
 void            dx_container_setup_agent(dx_dispatch_t *dx);
 void            dx_container_free(dx_container_t *container);
-dx_router_t    *dx_router(dx_dispatch_t *dx, const char *area, const char *id);
-void            dx_router_setup_agent(dx_dispatch_t *dx);
+dx_router_t    *dx_router(dx_dispatch_t *dx, dx_router_mode_t mode, const char *area, const char *id);
+void            dx_router_setup_late(dx_dispatch_t *dx);
 void            dx_router_free(dx_router_t *router);
 dx_agent_t     *dx_agent(dx_dispatch_t *dx);
 void            dx_agent_free(dx_agent_t *agent);
@@ -53,10 +54,13 @@ dx_dispatch_t *dx_dispatch(const char *c
 {
     dx_dispatch_t *dx = NEW(dx_dispatch_t);
 
-    int         thread_count   = 0;
-    const char *container_name = 0;
-    const char *router_area    = 0;
-    const char *router_id      = 0;
+    int         thread_count    = 0;
+    const char *container_name  = 0;
+    const char *router_mode_str = 0;
+    const char *router_area     = 0;
+    const char *router_id       = 0;
+
+    dx_router_mode_t  router_mode = DX_ROUTER_MODE_STANDALONE;
 
     DEQ_INIT(dx->config_listeners);
     DEQ_INIT(dx->config_connectors);
@@ -78,8 +82,9 @@ dx_dispatch_t *dx_dispatch(const char *c
 
         count = dx_config_item_count(dx->config, CONF_ROUTER);
         if (count == 1) {
-            router_area = dx_config_item_value_string(dx->config, CONF_ROUTER, 0, "area");
-            router_id   = dx_config_item_value_string(dx->config, CONF_ROUTER, 0, "router-id");
+            router_mode_str = dx_config_item_value_string(dx->config, CONF_ROUTER, 0, "mode");
+            router_area     = dx_config_item_value_string(dx->config, CONF_ROUTER, 0, "area");
+            router_id       = dx_config_item_value_string(dx->config, CONF_ROUTER, 0, "router-id");
         }
     }
 
@@ -89,6 +94,12 @@ dx_dispatch_t *dx_dispatch(const char *c
     if (!container_name)
         container_name = "00000000-0000-0000-0000-000000000000";  // TODO - gen a real uuid
 
+    if (router_mode_str && strcmp(router_mode_str, "interior") == 0)
+        router_mode = DX_ROUTER_MODE_INTERIOR;
+
+    if (router_mode_str && strcmp(router_mode_str, "edge") == 0)
+        router_mode = DX_ROUTER_MODE_EDGE;
+
     if (!router_area)
         router_area = "area";
 
@@ -97,13 +108,13 @@ dx_dispatch_t *dx_dispatch(const char *c
 
     dx->server    = dx_server(thread_count, container_name);
     dx->container = dx_container(dx);
-    dx->router    = dx_router(dx, router_area, router_id);
+    dx->router    = dx_router(dx, router_mode, router_area, router_id);
     dx->agent     = dx_agent(dx);
 
     dx_alloc_setup_agent(dx);
     dx_server_setup_agent(dx);
     dx_container_setup_agent(dx);
-    dx_router_setup_agent(dx);
+    dx_router_setup_late(dx);
 
     return dx;
 }
@@ -126,6 +137,7 @@ static void load_server_config(dx_dispat
 {
     config->host = dx_config_item_value_string(dx->config, section, i, "addr");
     config->port = dx_config_item_value_string(dx->config, section, i, "port");
+    config->role = dx_config_item_value_string(dx->config, section, i, "role");
     config->sasl_mechanisms =
         dx_config_item_value_string(dx->config, section, i, "sasl-mechanisms");
     config->ssl_enabled =

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/hash.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/hash.c?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/hash.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/hash.c Mon Oct 21 22:04:51 2013
@@ -23,18 +23,18 @@
 #include <stdio.h>
 #include <string.h>
 
-typedef struct hash_item_t {
-    DEQ_LINKS(struct hash_item_t);
+typedef struct dx_hash_item_t {
+    DEQ_LINKS(struct dx_hash_item_t);
     unsigned char *key;
     union {
         void       *val;
         const void *val_const;
     } v;
-} hash_item_t;
+} dx_hash_item_t;
 
-ALLOC_DECLARE(hash_item_t);
-ALLOC_DEFINE(hash_item_t);
-DEQ_DECLARE(hash_item_t, items_t);
+ALLOC_DECLARE(dx_hash_item_t);
+ALLOC_DEFINE(dx_hash_item_t);
+DEQ_DECLARE(dx_hash_item_t, items_t);
 
 
 typedef struct bucket_t {
@@ -42,7 +42,7 @@ typedef struct bucket_t {
 } bucket_t;
 
 
-struct hash_t {
+struct dx_hash_t {
     bucket_t     *buckets;
     unsigned int  bucket_count;
     unsigned int  bucket_mask;
@@ -52,8 +52,17 @@ struct hash_t {
 };
 
 
+struct dx_hash_handle_t {
+    bucket_t    *bucket;
+    dx_hash_item_t *item;
+};
+
+ALLOC_DECLARE(dx_hash_handle_t);
+ALLOC_DEFINE(dx_hash_handle_t);
+
+
 // djb2 hash algorithm
-static unsigned long hash_function(dx_field_iterator_t *iter)
+static unsigned long dx_hash_function(dx_field_iterator_t *iter)
 {
     unsigned long hash = 5381;
     int c;
@@ -68,10 +77,10 @@ static unsigned long hash_function(dx_fi
 }
 
 
-hash_t *hash(int bucket_exponent, int batch_size, int value_is_const)
+dx_hash_t *dx_hash(int bucket_exponent, int batch_size, int value_is_const)
 {
     int i;
-    hash_t *h = NEW(hash_t);
+    dx_hash_t *h = NEW(dx_hash_t);
 
     if (!h)
         return 0;
@@ -90,22 +99,22 @@ hash_t *hash(int bucket_exponent, int ba
 }
 
 
-void hash_free(hash_t *h)
+void dx_hash_free(dx_hash_t *h)
 {
     // TODO - Implement this
 }
 
 
-size_t hash_size(hash_t *h)
+size_t dx_hash_size(dx_hash_t *h)
 {
     return h ? h->size : 0;
 }
 
 
-static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, int *exists)
+static dx_hash_item_t *dx_hash_internal_insert(dx_hash_t *h, dx_field_iterator_t *key, int *exists, dx_hash_handle_t **handle)
 {
-    unsigned long  idx  = hash_function(key) & h->bucket_mask;
-    hash_item_t   *item = DEQ_HEAD(h->buckets[idx].items);
+    unsigned long   idx  = dx_hash_function(key) & h->bucket_mask;
+    dx_hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
 
     while (item) {
         if (dx_field_iterator_equal(key, item->key))
@@ -115,10 +124,12 @@ static hash_item_t *hash_internal_insert
 
     if (item) {
         *exists = 1;
+        if (handle)
+            *handle = 0;
         return item;
     }
 
-    item = new_hash_item_t();
+    item = new_dx_hash_item_t();
     if (!item)
         return 0;
 
@@ -128,14 +139,24 @@ static hash_item_t *hash_internal_insert
     DEQ_INSERT_TAIL(h->buckets[idx].items, item);
     h->size++;
     *exists = 0;
+
+    //
+    // If a pointer to a handle-pointer was supplied, create a handle for this item.
+    //
+    if (handle) {
+        *handle = new_dx_hash_handle_t();
+        (*handle)->bucket = &h->buckets[idx];
+        (*handle)->item   = item;
+    }
+
     return item;
 }
 
 
-dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val)
+dx_error_t dx_hash_insert(dx_hash_t *h, dx_field_iterator_t *key, void *val, dx_hash_handle_t **handle)
 {
-    int          exists = 0;
-    hash_item_t *item   = hash_internal_insert(h, key, &exists);
+    int             exists = 0;
+    dx_hash_item_t *item   = dx_hash_internal_insert(h, key, &exists, handle);
 
     if (!item)
         return DX_ERROR_ALLOC;
@@ -149,12 +170,12 @@ dx_error_t hash_insert(hash_t *h, dx_fie
 }
 
 
-dx_error_t hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val)
+dx_error_t dx_hash_insert_const(dx_hash_t *h, dx_field_iterator_t *key, const void *val, dx_hash_handle_t **handle)
 {
     assert(h->is_const);
 
-    int          error = 0;
-    hash_item_t *item  = hash_internal_insert(h, key, &error);
+    int             error = 0;
+    dx_hash_item_t *item  = dx_hash_internal_insert(h, key, &error, handle);
 
     if (item)
         item->v.val_const = val;
@@ -162,10 +183,10 @@ dx_error_t hash_insert_const(hash_t *h, 
 }
 
 
-static hash_item_t *hash_internal_retrieve(hash_t *h, dx_field_iterator_t *key)
+static dx_hash_item_t *dx_hash_internal_retrieve(dx_hash_t *h, dx_field_iterator_t *key)
 {
-    unsigned long  idx  = hash_function(key) & h->bucket_mask;
-    hash_item_t   *item = DEQ_HEAD(h->buckets[idx].items);
+    unsigned long   idx  = dx_hash_function(key) & h->bucket_mask;
+    dx_hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
 
     while (item) {
         if (dx_field_iterator_equal(key, item->key))
@@ -177,9 +198,9 @@ static hash_item_t *hash_internal_retrie
 }
 
 
-dx_error_t hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val)
+dx_error_t dx_hash_retrieve(dx_hash_t *h, dx_field_iterator_t *key, void **val)
 {
-    hash_item_t *item = hash_internal_retrieve(h, key);
+    dx_hash_item_t *item = dx_hash_internal_retrieve(h, key);
     if (item)
         *val = item->v.val;
     else
@@ -189,11 +210,11 @@ dx_error_t hash_retrieve(hash_t *h, dx_f
 }
 
 
-dx_error_t hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val)
+dx_error_t dx_hash_retrieve_const(dx_hash_t *h, dx_field_iterator_t *key, const void **val)
 {
     assert(h->is_const);
 
-    hash_item_t *item = hash_internal_retrieve(h, key);
+    dx_hash_item_t *item = dx_hash_internal_retrieve(h, key);
     if (item)
         *val = item->v.val_const;
     else
@@ -203,10 +224,10 @@ dx_error_t hash_retrieve_const(hash_t *h
 }
 
 
-dx_error_t hash_remove(hash_t *h, dx_field_iterator_t *key)
+dx_error_t dx_hash_remove(dx_hash_t *h, dx_field_iterator_t *key)
 {
-    unsigned long  idx  = hash_function(key) & h->bucket_mask;
-    hash_item_t   *item = DEQ_HEAD(h->buckets[idx].items);
+    unsigned long   idx  = dx_hash_function(key) & h->bucket_mask;
+    dx_hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
 
     while (item) {
         if (dx_field_iterator_equal(key, item->key))
@@ -217,7 +238,7 @@ dx_error_t hash_remove(hash_t *h, dx_fie
     if (item) {
         free(item->key);
         DEQ_REMOVE(h->buckets[idx].items, item);
-        free_hash_item_t(item);
+        free_dx_hash_item_t(item);
         h->size--;
         return DX_ERROR_NONE;
     }
@@ -225,3 +246,40 @@ dx_error_t hash_remove(hash_t *h, dx_fie
     return DX_ERROR_NOT_FOUND;
 }
 
+
+void dx_hash_handle_free(dx_hash_handle_t *handle)
+{
+    if (handle)
+        free_dx_hash_handle_t(handle);
+}
+
+
+const unsigned char *dx_hash_key_by_handle(const dx_hash_handle_t *handle)
+{
+    if (handle)
+        return handle->item->key;
+    return 0;
+}
+
+
+dx_error_t dx_hash_remove_by_handle(dx_hash_t *h, dx_hash_handle_t *handle)
+{
+    unsigned char *key   = 0;
+    dx_error_t     error = dx_hash_remove_by_handle2(h, handle, &key);
+    if (key)
+        free(key);
+    return error;
+}
+
+
+dx_error_t dx_hash_remove_by_handle2(dx_hash_t *h, dx_hash_handle_t *handle, unsigned char **key)
+{
+    if (!handle)
+        return DX_ERROR_NOT_FOUND;
+    *key = handle->item->key;
+    DEQ_REMOVE(handle->bucket->items, handle->item);
+    free_dx_hash_item_t(handle->item);
+    h->size--;
+    return DX_ERROR_NONE;
+}
+

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/message.c
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/message.c?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/message.c (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/message.c Mon Oct 21 22:04:51 2013
@@ -376,7 +376,7 @@ static dx_field_location_t *dx_message_f
 }
 
 
-dx_message_t *dx_allocate_message()
+dx_message_t *dx_message()
 {
     dx_message_pvt_t *msg = (dx_message_pvt_t*) new_dx_message_t();
     if (!msg)
@@ -400,7 +400,7 @@ dx_message_t *dx_allocate_message()
 }
 
 
-void dx_free_message(dx_message_t *in_msg)
+void dx_message_free(dx_message_t *in_msg)
 {
     uint32_t rc;
     dx_message_pvt_t     *msg     = (dx_message_pvt_t*) in_msg;
@@ -417,14 +417,14 @@ void dx_free_message(dx_message_t *in_ms
         dx_buffer_t *buf = DEQ_HEAD(content->buffers);
         while (buf) {
             DEQ_REMOVE_HEAD(content->buffers);
-            dx_free_buffer(buf);
+            dx_buffer_free(buf);
             buf = DEQ_HEAD(content->buffers);
         }
 
         buf = DEQ_HEAD(content->new_delivery_annotations);
         while (buf) {
             DEQ_REMOVE_HEAD(content->new_delivery_annotations);
-            dx_free_buffer(buf);
+            dx_buffer_free(buf);
             buf = DEQ_HEAD(content->new_delivery_annotations);
         }
 
@@ -474,9 +474,11 @@ dx_parsed_field_t *dx_message_delivery_a
         !dx_parse_is_map(content->parsed_delivery_annotations)) {
         dx_field_iterator_free(da);
         dx_parse_free(content->parsed_delivery_annotations);
+        content->parsed_delivery_annotations = 0;
         return 0;
     }
 
+    dx_field_iterator_free(da);
     return content->parsed_delivery_annotations;
 }
 
@@ -506,7 +508,7 @@ dx_message_t *dx_message_receive(dx_deli
     // link it and the delivery together.
     //
     if (!msg) {
-        msg = (dx_message_pvt_t*) dx_allocate_message();
+        msg = (dx_message_pvt_t*) dx_message();
         dx_delivery_set_context(delivery, (void*) msg);
     }
 
@@ -517,7 +519,7 @@ dx_message_t *dx_message_receive(dx_deli
     //
     buf = DEQ_TAIL(msg->content->buffers);
     if (!buf) {
-        buf = dx_allocate_buffer();
+        buf = dx_buffer();
         DEQ_INSERT_TAIL(msg->content->buffers, buf);
     }
 
@@ -538,7 +540,7 @@ dx_message_t *dx_message_receive(dx_deli
             //
             if (dx_buffer_size(buf) == 0) {
                 DEQ_REMOVE_TAIL(msg->content->buffers);
-                dx_free_buffer(buf);
+                dx_buffer_free(buf);
             }
             dx_delivery_set_context(delivery, 0);
             return (dx_message_t*) msg;
@@ -556,7 +558,7 @@ dx_message_t *dx_message_receive(dx_deli
             // tail of the message's list.
             //
             if (dx_buffer_capacity(buf) == 0) {
-                buf = dx_allocate_buffer();
+                buf = dx_buffer();
                 DEQ_INSERT_TAIL(msg->content->buffers, buf);
             }
         } else

Modified: qpid/branches/linearstore/qpid/extras/dispatch/src/message_private.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/extras/dispatch/src/message_private.h?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/extras/dispatch/src/message_private.h (original)
+++ qpid/branches/linearstore/qpid/extras/dispatch/src/message_private.h Mon Oct 21 22:04:51 2013
@@ -67,7 +67,7 @@ typedef struct {
     sys_mutex_t         *lock;
     uint32_t             ref_count;                       // The number of messages referencing this
     dx_buffer_list_t     buffers;                         // The buffer chain containing the message
-    dx_buffer_list_t     new_delivery_annotations;        // The buffer chain containing the new delivery annotations
+    dx_buffer_list_t     new_delivery_annotations;        // The buffer chain containing the new delivery annotations (MOVE TO MSG_PVT)
     dx_field_location_t  section_message_header;          // The message header list
     dx_field_location_t  section_delivery_annotation;     // The delivery annotation map
     dx_field_location_t  section_message_annotation;      // The message annotation map



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message