qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tr...@apache.org
Subject svn commit: r1526694 [2/3] - in /qpid/trunk/qpid/extras/dispatch: ./ include/qpid/ include/qpid/dispatch/ python/qpid/dispatch/router/ router/src/ src/ tests/
Date Thu, 26 Sep 2013 21:14:59 GMT
Modified: qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py?rev=1526694&r1=1526693&r2=1526694&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py (original)
+++ qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py Thu Sep 26 21:14:59 2013
@@ -36,245 +36,249 @@ from node import NodeTracker
 ## (i.e. we are in a test bench, etc.), load the stub versions.
 ##
 try:
-  from dispatch import *
+    from dispatch import *
 except ImportError:
-  from ..stubs import *
+    from ..stubs import *
 
 
 class RouterEngine:
-  """
-  """
-
-  def __init__(self, router_adapter, router_id=None, area='area', config_override={}):
-    """
-    Initialize an instance of a router for a domain.
-    """
-    ##
-    ## Record important information about this router instance
-    ##
-    self.domain         = "domain"
-    self.router_adapter = router_adapter
-    self.log_adapter    = LogAdapter("dispatch.router")
-    self.io_adapter     = IoAdapter(self, "qdxrouter")
-
-    if router_id:
-      self.id = router_id
-    else:
-      self.id = str(uuid4())
-    self.area = area
-    self.log(LOG_INFO, "Router Engine Instantiated: area=%s id=%s" % (self.area, self.id))
-
-    ##
-    ## Setup configuration
-    ##
-    self.config = Configuration(config_override)
-    self.log(LOG_INFO, "Config: %r" % self.config)
-
-    ##
-    ## Launch the sub-module engines
-    ##
-    self.neighbor_engine       = NeighborEngine(self)
-    self.link_state_engine     = LinkStateEngine(self)
-    self.path_engine           = PathEngine(self)
-    self.mobile_address_engine = MobileAddressEngine(self)
-    self.routing_table_engine  = RoutingTableEngine(self)
-    self.binding_engine        = BindingEngine(self)
-    self.adapter_engine        = AdapterEngine(self)
-    self.node_tracker          = NodeTracker(self)
-
-
-
-  ##========================================================================================
-  ## Adapter Entry Points - invoked from the adapter
-  ##========================================================================================
-  def getId(self):
-    """
-    Return the router's ID
-    """
-    return self.id
-
-
-  def addLocalAddress(self, key):
-    """
-    """
-    try:
-      if key.find('_topo') == 0 or key.find('_local') == 0:
-        return
-      self.mobile_address_engine.add_local_address(key)
-    except Exception, e:
-      self.log(LOG_ERROR, "Exception in new-address processing: exception=%r" % e)
-
-  def delLocalAddress(self, key):
-    """
-    """
-    try:
-      if key.find('_topo') == 0 or key.find('_local') == 0:
-        return
-      self.mobile_address_engine.del_local_address(key)
-    except Exception, e:
-      self.log(LOG_ERROR, "Exception in del-address processing: exception=%r" % e)
-
-
-  def handleTimerTick(self):
-    """
-    """
-    try:
-      now = time()
-      self.neighbor_engine.tick(now)
-      self.link_state_engine.tick(now)
-      self.path_engine.tick(now)
-      self.mobile_address_engine.tick(now)
-      self.routing_table_engine.tick(now)
-      self.binding_engine.tick(now)
-      self.adapter_engine.tick(now)
-      self.node_tracker.tick(now)
-    except Exception, e:
-      self.log(LOG_ERROR, "Exception in timer processing: exception=%r" % e)
-
-
-  def handleControlMessage(self, opcode, body):
     """
     """
-    try:
-      now = time()
-      if   opcode == 'HELLO':
-        msg = MessageHELLO(body)
-        self.log(LOG_TRACE, "RCVD: %r" % msg)
-        self.neighbor_engine.handle_hello(msg, now)
-
-      elif opcode == 'RA':
-        msg = MessageRA(body)
-        self.log(LOG_TRACE, "RCVD: %r" % msg)
-        self.link_state_engine.handle_ra(msg, now)
-        self.mobile_address_engine.handle_ra(msg, now)
-
-      elif opcode == 'LSU':
-        msg = MessageLSU(body)
-        self.log(LOG_TRACE, "RCVD: %r" % msg)
-        self.link_state_engine.handle_lsu(msg, now)
-
-      elif opcode == 'LSR':
-        msg = MessageLSR(body)
-        self.log(LOG_TRACE, "RCVD: %r" % msg)
-        self.link_state_engine.handle_lsr(msg, now)
-
-      elif opcode == 'MAU':
-        msg = MessageMAU(body)
-        self.log(LOG_TRACE, "RCVD: %r" % msg)
-        self.mobile_address_engine.handle_mau(msg, now)
-
-      elif opcode == 'MAR':
-        msg = MessageMAR(body)
-        self.log(LOG_TRACE, "RCVD: %r" % msg)
-        self.mobile_address_engine.handle_mar(msg, now)
-
-    except Exception, e:
-      self.log(LOG_ERROR, "Exception in message processing: opcode=%s body=%r exception=%r" % (opcode, body, e))
-
-
-  def receive(self, message_properties, body):
-    """
-    This is the IoAdapter message-receive handler
-    """
-    try:
-      self.handleControlMessage(message_properties['opcode'], body)
-    except Exception, e:
-      self.log(LOG_ERROR, "Exception in raw message processing: properties=%r body=%r exception=%r" %
-               (message_properties, body, e))
-
-  def getRouterData(self, kind):
-    """
-    """
-    if kind == 'help':
-      return { 'help'           : "Get list of supported values for kind",
-               'link-state'     : "This router's link state",
-               'link-state-set' : "The set of link states from known routers",
-               'next-hops'      : "Next hops to each known router",
-               'topo-table'     : "Topological routing table",
-               'mobile-table'   : "Mobile key routing table"
-             }
-    if kind == 'link-state'     : return self.neighbor_engine.link_state.to_dict()
-    if kind == 'next-hops'      : return self.routing_table_engine.next_hops
-    if kind == 'topo-table'     : return {'table': self.adapter_engine.key_classes['topological']}
-    if kind == 'mobile-table'   : return {'table': self.adapter_engine.key_classes['mobile-key']}
-    if kind == 'link-state-set' :
-      copy = {}
-      for _id,_ls in self.link_state_engine.collection.items():
-        copy[_id] = _ls.to_dict()
-      return copy
-
-    return {'notice':'Use kind="help" to get a list of possibilities'}
-
-
-  ##========================================================================================
-  ## Adapter Calls - outbound calls to Dispatch
-  ##========================================================================================
-  def log(self, level, text):
-    """
-    Emit a log message to the host's event log
-    """
-    self.log_adapter.log(level, text)
-
-
-  def send(self, dest, msg):
-    """
-    Send a control message to another router.
-    """
-    app_props = {'opcode' : msg.get_opcode() }
-    self.io_adapter.send(dest, app_props, msg.to_dict())
-    self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest))
-
-
-  def node_updated(self, addr, reachable, neighbor):
-    """
-    """
-    self.router_adapter(addr, reachable, neighbor)
-
-
-  ##========================================================================================
-  ## Interconnect between the Sub-Modules
-  ##========================================================================================
-  def local_link_state_changed(self, link_state):
-    self.log(LOG_DEBUG, "Event: local_link_state_changed: %r" % link_state)
-    self.link_state_engine.new_local_link_state(link_state)
-
-  def ls_collection_changed(self, collection):
-    self.log(LOG_DEBUG, "Event: ls_collection_changed: %r" % collection)
-    self.path_engine.ls_collection_changed(collection)
-
-  def next_hops_changed(self, next_hop_table):
-    self.log(LOG_DEBUG, "Event: next_hops_changed: %r" % next_hop_table)
-    self.routing_table_engine.next_hops_changed(next_hop_table)
-    self.binding_engine.next_hops_changed()
-
-  def mobile_sequence_changed(self, mobile_seq):
-    self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq)
-    self.link_state_engine.set_mobile_sequence(mobile_seq)
-
-  def mobile_keys_changed(self, keys):
-    self.log(LOG_DEBUG, "Event: mobile_keys_changed: %r" % keys)
-    self.binding_engine.mobile_keys_changed(keys)
-
-  def get_next_hops(self):
-    return self.routing_table_engine.get_next_hops()
-
-  def remote_routes_changed(self, key_class, routes):
-    self.log(LOG_DEBUG, "Event: remote_routes_changed: class=%s routes=%r" % (key_class, routes))
-    self.adapter_engine.remote_routes_changed(key_class, routes)
-
-  def new_neighbor(self, rid):
-    self.log(LOG_DEBUG, "Event: new_neighbor: id=%s" % rid)
-    self.node_tracker.new_neighbor(rid)
-
-  def lost_neighbor(self, rid):
-    self.log(LOG_DEBUG, "Event: lost_neighbor: id=%s" % rid)
-    self.node_tracker.lost_neighbor(rid)
-
-  def new_node(self, rid):
-    self.log(LOG_DEBUG, "Event: new_node: id=%s" % rid)
-    self.node_tracker.new_node(rid)
-
-  def lost_node(self, rid):
-    self.log(LOG_DEBUG, "Event: lost_node: id=%s" % rid)
-    self.node_tracker.lost_node(rid)
 
+    def __init__(self, router_adapter, router_id, area, max_routers, config_override={}):
+        """
+        Initialize an instance of a router for a domain.
+        """
+        ##
+        ## Record important information about this router instance
+        ##
+        self.domain         = "domain"
+        self.router_adapter = router_adapter
+        self.log_adapter    = LogAdapter("dispatch.router")
+        self.io_adapter     = IoAdapter(self, "qdxrouter")
+        self.max_routers    = max_routers
+        self.id             = router_id
+        self.area           = area
+        self.log(LOG_INFO, "Router Engine Instantiated: area=%s id=%s max_routers=%d" %
+                 (self.area, self.id, self.max_routers))
+
+        ##
+        ## Setup configuration
+        ##
+        self.config = Configuration(config_override)
+        self.log(LOG_INFO, "Config: %r" % self.config)
+
+        ##
+        ## Launch the sub-module engines
+        ##
+        self.neighbor_engine       = NeighborEngine(self)
+        self.link_state_engine     = LinkStateEngine(self)
+        self.path_engine           = PathEngine(self)
+        self.mobile_address_engine = MobileAddressEngine(self)
+        self.routing_table_engine  = RoutingTableEngine(self)
+        self.binding_engine        = BindingEngine(self)
+        self.adapter_engine        = AdapterEngine(self)
+        self.node_tracker          = NodeTracker(self, self.max_routers)
+
+
+
+    ##========================================================================================
+    ## Adapter Entry Points - invoked from the adapter
+    ##========================================================================================
+    def getId(self):
+        """
+        Return the router's ID
+        """
+        return self.id
+
+
+    def addLocalAddress(self, key):
+        """
+        """
+        try:
+            if key.find('_topo') == 0 or key.find('_local') == 0:
+                return
+            self.mobile_address_engine.add_local_address(key)
+        except Exception, e:
+            self.log(LOG_ERROR, "Exception in new-address processing: exception=%r" % e)
+
+
+    def delLocalAddress(self, key):
+        """
+        """
+        try:
+            if key.find('_topo') == 0 or key.find('_local') == 0:
+                return
+            self.mobile_address_engine.del_local_address(key)
+        except Exception, e:
+            self.log(LOG_ERROR, "Exception in del-address processing: exception=%r" % e)
+
+
+    def handleTimerTick(self):
+        """
+        """
+        try:
+            now = time()
+            self.neighbor_engine.tick(now)
+            self.link_state_engine.tick(now)
+            self.path_engine.tick(now)
+            self.mobile_address_engine.tick(now)
+            self.routing_table_engine.tick(now)
+            self.binding_engine.tick(now)
+            self.adapter_engine.tick(now)
+            self.node_tracker.tick(now)
+        except Exception, e:
+            self.log(LOG_ERROR, "Exception in timer processing: exception=%r" % e)
+
+
+    def handleControlMessage(self, opcode, body, link_id):
+        """
+        """
+        try:
+            now = time()
+            if   opcode == 'HELLO':
+                msg = MessageHELLO(body)
+                self.log(LOG_TRACE, "RCVD: %r" % msg)
+                self.neighbor_engine.handle_hello(msg, now, link_id)
+
+            elif opcode == 'RA':
+                msg = MessageRA(body)
+                self.log(LOG_TRACE, "RCVD: %r" % msg)
+                self.link_state_engine.handle_ra(msg, now)
+                self.mobile_address_engine.handle_ra(msg, now)
+
+            elif opcode == 'LSU':
+                msg = MessageLSU(body)
+                self.log(LOG_TRACE, "RCVD: %r" % msg)
+                self.link_state_engine.handle_lsu(msg, now)
+
+            elif opcode == 'LSR':
+                msg = MessageLSR(body)
+                self.log(LOG_TRACE, "RCVD: %r" % msg)
+                self.link_state_engine.handle_lsr(msg, now)
+
+            elif opcode == 'MAU':
+                msg = MessageMAU(body)
+                self.log(LOG_TRACE, "RCVD: %r" % msg)
+                self.mobile_address_engine.handle_mau(msg, now)
+
+            elif opcode == 'MAR':
+                msg = MessageMAR(body)
+                self.log(LOG_TRACE, "RCVD: %r" % msg)
+                self.mobile_address_engine.handle_mar(msg, now)
+
+        except Exception, e:
+            self.log(LOG_ERROR, "Exception in message processing: opcode=%s body=%r exception=%r" % (opcode, body, e))
+
+
+    def receive(self, message_properties, body, link_id):
+        """
+        This is the IoAdapter message-receive handler
+        """
+        try:
+            self.handleControlMessage(message_properties['opcode'], body, link_id)
+        except Exception, e:
+            self.log(LOG_ERROR, "Exception in raw message processing: properties=%r body=%r exception=%r" %
+                     (message_properties, body, e))
+
+
+    def getRouterData(self, kind):
+        """
+        """
+        if kind == 'help':
+            return { 'help'           : "Get list of supported values for kind",
+                     'link-state'     : "This router's link state",
+                     'link-state-set' : "The set of link states from known routers",
+                     'next-hops'      : "Next hops to each known router",
+                     'topo-table'     : "Topological routing table",
+                     'mobile-table'   : "Mobile key routing table"
+                     }
+        if kind == 'link-state'     : return self.neighbor_engine.link_state.to_dict()
+        if kind == 'next-hops'      : return self.routing_table_engine.next_hops
+        if kind == 'topo-table'     : return {'table': self.adapter_engine.key_classes['topological']}
+        if kind == 'mobile-table'   : return {'table': self.adapter_engine.key_classes['mobile-key']}
+        if kind == 'link-state-set' :
+            copy = {}
+            for _id,_ls in self.link_state_engine.collection.items():
+                copy[_id] = _ls.to_dict()
+            return copy
+
+        return {'notice':'Use kind="help" to get a list of possibilities'}
+
+
+    ##========================================================================================
+    ## Adapter Calls - outbound calls to Dispatch
+    ##========================================================================================
+    def log(self, level, text):
+        """
+        Emit a log message to the host's event log
+        """
+        self.log_adapter.log(level, text)
+
+
+    def send(self, dest, msg):
+        """
+        Send a control message to another router.
+        """
+        app_props = {'opcode' : msg.get_opcode() }
+        self.io_adapter.send(dest, app_props, msg.to_dict())
+        self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest))
+
+
+    def node_updated(self, addr, reachable, neighbor):
+        """
+        """
+        self.router_adapter(addr, reachable, neighbor)
+
+
+    ##========================================================================================
+    ## Interconnect between the Sub-Modules
+    ##========================================================================================
+    def local_link_state_changed(self, link_state):
+        self.log(LOG_DEBUG, "Event: local_link_state_changed: %r" % link_state)
+        self.link_state_engine.new_local_link_state(link_state)
+
+    def ls_collection_changed(self, collection):
+        self.log(LOG_DEBUG, "Event: ls_collection_changed: %r" % collection)
+        self.path_engine.ls_collection_changed(collection)
+
+    def next_hops_changed(self, next_hop_table):
+        self.log(LOG_DEBUG, "Event: next_hops_changed: %r" % next_hop_table)
+        self.routing_table_engine.next_hops_changed(next_hop_table)
+        self.binding_engine.next_hops_changed()
+
+    def mobile_sequence_changed(self, mobile_seq):
+        self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq)
+        self.link_state_engine.set_mobile_sequence(mobile_seq)
+
+    def mobile_keys_changed(self, keys):
+        self.log(LOG_DEBUG, "Event: mobile_keys_changed: %r" % keys)
+        self.binding_engine.mobile_keys_changed(keys)
+
+    def get_next_hops(self):
+        return self.routing_table_engine.get_next_hops()
+
+    def remote_routes_changed(self, key_class, routes):
+        self.log(LOG_DEBUG, "Event: remote_routes_changed: class=%s routes=%r" % (key_class, routes))
+        self.adapter_engine.remote_routes_changed(key_class, routes)
+
+    def new_neighbor(self, rid, link_id):
+        self.log(LOG_DEBUG, "Event: new_neighbor: id=%s link_id=%d" % (rid, link_id))
+        self.node_tracker.new_neighbor(rid, link_id)
+
+    def lost_neighbor(self, rid):
+        self.log(LOG_DEBUG, "Event: lost_neighbor: id=%s" % rid)
+        self.node_tracker.lost_neighbor(rid)
+
+    def new_node(self, rid):
+        self.log(LOG_DEBUG, "Event: new_node: id=%s" % rid)
+        self.node_tracker.new_node(rid)
+
+    def lost_node(self, rid):
+        self.log(LOG_DEBUG, "Event: lost_node: id=%s" % rid)
+        self.node_tracker.lost_node(rid)
+
+    def node_updated(self, address, reachable, neighbor, link_bit, router_bit):
+        self.log(LOG_DEBUG, "Event: node_updated: address=%s, reachable=%r, neighbor=%r, link_bit=%d, router_bit=%d" % \
+                     (address, reachable, neighbor, link_bit, router_bit))
+        self.router_adapter.node_updataed(address, reachable, neighbor, link_bit, router_bit)

Modified: qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py?rev=1526694&r1=1526693&r2=1526694&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py (original)
+++ qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py Thu Sep 26 21:14:59 2013
@@ -18,39 +18,39 @@
 #
 
 try:
-  from dispatch import *
+    from dispatch import *
 except ImportError:
-  from ..stubs import *
+    from ..stubs import *
 
 class RoutingTableEngine(object):
-  """
-  This module is responsible for converting the set of next hops to remote routers to a routing
-  table in the "topological" address class.
-  """
-  def __init__(self, container):
-    self.container = container
-    self.id = self.container.id
-    self.area = self.container.area
-    self.next_hops = {}
-
-
-  def tick(self, now):
-    pass
-
-
-  def next_hops_changed(self, next_hops):
-    # Convert next_hops into routing table
-    self.next_hops = next_hops
-    new_table = []
-    for _id, next_hop in next_hops.items():
-      new_table.append(('_topo.%s.%s.#'  % (self.area, _id), next_hop))
-      pair = ('_topo.%s.all' % (self.area), next_hop)
-      if new_table.count(pair) == 0:
-        new_table.append(pair)
+    """
+    This module is responsible for converting the set of next hops to remote routers to a routing
+    table in the "topological" address class.
+    """
+    def __init__(self, container):
+        self.container = container
+        self.id = self.container.id
+        self.area = self.container.area
+        self.next_hops = {}
+
+
+    def tick(self, now):
+        pass
+
+
+    def next_hops_changed(self, next_hops):
+        # Convert next_hops into routing table
+        self.next_hops = next_hops
+        new_table = []
+        for _id, next_hop in next_hops.items():
+            new_table.append(('_topo.%s.%s.#'  % (self.area, _id), next_hop))
+            pair = ('_topo.%s.all' % (self.area), next_hop)
+            if new_table.count(pair) == 0:
+                new_table.append(pair)
 
-    self.container.remote_routes_changed('topological', new_table)
+        self.container.remote_routes_changed('topological', new_table)
 
 
-  def get_next_hops(self):
-    return self.next_hops
+    def get_next_hops(self):
+        return self.next_hops
 

Modified: qpid/trunk/qpid/extras/dispatch/router/src/main.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/router/src/main.c?rev=1526694&r1=1526693&r2=1526694&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/router/src/main.c (original)
+++ qpid/trunk/qpid/extras/dispatch/router/src/main.c Thu Sep 26 21:14:59 2013
@@ -117,7 +117,7 @@ int main(int argc, char **argv)
         }
     }
 
-    dx_log_set_mask(0xFFFFFFFF);
+    dx_log_set_mask(0xFFFFFFFE);
 
     dispatch = dx_dispatch(config_path);
 

Modified: qpid/trunk/qpid/extras/dispatch/src/agent.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/agent.c?rev=1526694&r1=1526693&r2=1526694&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/agent.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/agent.c Thu Sep 26 21:14:59 2013
@@ -240,7 +240,7 @@ static void dx_agent_deferred_handler(vo
 }
 
 
-static void dx_agent_rx_handler(void *context, dx_message_t *msg)
+static void dx_agent_rx_handler(void *context, dx_message_t *msg, int unused_link_id)
 {
     dx_agent_t   *agent = (dx_agent_t*) context;
     dx_message_t *copy  = dx_message_copy(msg);

Added: qpid/trunk/qpid/extras/dispatch/src/amqp.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/amqp.c?rev=1526694&view=auto
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/amqp.c (added)
+++ qpid/trunk/qpid/extras/dispatch/src/amqp.c Thu Sep 26 21:14:59 2013
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/amqp.h>
+
+const char * const DX_DA_INGRESS = "qdx.ingress";
+const char * const DX_DA_TRACE   = "qdx.trace";
+const char * const DX_DA_TO      = "qdx.to";
+
+const char * const DX_CAPABILITY_ROUTER = "qdx.router";
+
+const char * const DX_INTERNODE_LINK_NAME_1 = "qdx.internode.1";
+const char * const DX_INTERNODE_LINK_NAME_2 = "qdx.internode.2";
+

Propchange: qpid/trunk/qpid/extras/dispatch/src/amqp.c
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/trunk/qpid/extras/dispatch/src/bitmask.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/bitmask.c?rev=1526694&view=auto
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/bitmask.c (added)
+++ qpid/trunk/qpid/extras/dispatch/src/bitmask.c Thu Sep 26 21:14:59 2013
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/bitmask.h>
+#include <qpid/dispatch/alloc.h>
+#include <assert.h>
+
+#define DX_BITMASK_LONGS 16
+#define DX_BITMASK_BITS  (DX_BITMASK_LONGS * 64)
+
+struct dx_bitmask_t {
+    uint64_t array[DX_BITMASK_LONGS];
+    int      first_set;
+};
+
+ALLOC_DECLARE(dx_bitmask_t);
+ALLOC_DEFINE(dx_bitmask_t);
+
+#define MASK_INDEX(num)  (num / 64)
+#define MASK_ONEHOT(num) (1 << (num % 64))
+#define FIRST_NONE    -1
+#define FIRST_UNKNOWN -2
+
+
+int dx_bitmask_width()
+{
+    return DX_BITMASK_BITS;
+}
+
+
+dx_bitmask_t *dx_bitmask(int initial)
+{
+    dx_bitmask_t *b = new_dx_bitmask_t();
+    if (initial)
+        dx_bitmask_set_all(b);
+    else
+        dx_bitmask_clear_all(b);
+    return b;
+}
+
+
+void dx_bitmask_free(dx_bitmask_t *b)
+{
+    free_dx_bitmask_t(b);
+}
+
+
+void dx_bitmask_set_all(dx_bitmask_t *b)
+{
+    for (int i = 0; i < DX_BITMASK_LONGS; i++)
+        b->array[i] = 0xFFFFFFFFFFFFFFFF;
+    b->first_set = 0;
+}
+
+
+void dx_bitmask_clear_all(dx_bitmask_t *b)
+{
+    for (int i = 0; i < DX_BITMASK_LONGS; i++)
+        b->array[i] = 0;
+    b->first_set = FIRST_NONE;
+}
+
+
+void dx_bitmask_set_bit(dx_bitmask_t *b, int bitnum)
+{
+    assert(bitnum < DX_BITMASK_BITS);
+    b->array[MASK_INDEX(bitnum)] |= MASK_ONEHOT(bitnum);
+    if (b->first_set > bitnum)
+        b->first_set = bitnum;
+}
+
+
+void dx_bitmask_clear_bit(dx_bitmask_t *b, int bitnum)
+{
+    assert(bitnum < DX_BITMASK_BITS);
+    b->array[MASK_INDEX(bitnum)] &= ~(MASK_ONEHOT(bitnum));
+    if (b->first_set == bitnum)
+        b->first_set = FIRST_UNKNOWN;
+}
+
+
+int dx_bitmask_value(dx_bitmask_t *b, int bitnum)
+{
+    return (b->array[MASK_INDEX(bitnum)] & MASK_ONEHOT(bitnum)) ? 1 : 0;
+}
+
+
+int dx_bitmask_first_set(dx_bitmask_t *b, int *bitnum)
+{
+    if (b->first_set == FIRST_UNKNOWN) {
+        b->first_set = FIRST_NONE;
+        for (int i = 0; i < DX_BITMASK_LONGS; i++)
+            if (b->array[i]) {
+                for (int j = 0; j < 64; j++)
+                    if ((1 << j) & b->array[i]) {
+                        b->first_set = i * 64 + j;
+                        break;
+                    }
+                break;
+            }
+    }
+
+    if (b->first_set == FIRST_NONE)
+        return 0;
+    *bitnum = b->first_set;
+    return 1;
+}
+

Propchange: qpid/trunk/qpid/extras/dispatch/src/bitmask.c
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/extras/dispatch/src/message_private.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/message_private.h?rev=1526694&r1=1526693&r2=1526694&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/message_private.h (original)
+++ qpid/trunk/qpid/extras/dispatch/src/message_private.h Thu Sep 26 21:14:59 2013
@@ -67,7 +67,7 @@ typedef struct {
     sys_mutex_t         *lock;
     uint32_t             ref_count;                       // The number of messages referencing this
     dx_buffer_list_t     buffers;                         // The buffer chain containing the message
-    dx_buffer_list_t     new_delivery_annotations;        // The buffer chain containing the new delivery annotations
+    dx_buffer_list_t     new_delivery_annotations;        // The buffer chain containing the new delivery annotations (MOVE TO MSG_PVT)
     dx_field_location_t  section_message_header;          // The message header list
     dx_field_location_t  section_delivery_annotation;     // The delivery annotation map
     dx_field_location_t  section_message_annotation;      // The message annotation map

Modified: qpid/trunk/qpid/extras/dispatch/src/python_embedded.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/python_embedded.c?rev=1526694&r1=1526693&r2=1526694&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/python_embedded.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/python_embedded.c Thu Sep 26 21:14:59 2013
@@ -402,7 +402,7 @@ typedef struct {
 } IoAdapter;
 
 
-static void dx_io_rx_handler(void *context, dx_message_t *msg)
+static void dx_io_rx_handler(void *context, dx_message_t *msg, int link_id)
 {
     IoAdapter *self = (IoAdapter*) context;
 
@@ -454,9 +454,10 @@ static void dx_io_rx_handler(void *conte
     PyObject *pAP   = dx_field_to_py(ap_map);
     PyObject *pBody = dx_field_to_py(body_map);
 
-    PyObject *pArgs = PyTuple_New(2);
+    PyObject *pArgs = PyTuple_New(3);
     PyTuple_SetItem(pArgs, 0, pAP);
     PyTuple_SetItem(pArgs, 1, pBody);
+    PyTuple_SetItem(pArgs, 2, PyInt_FromLong((long) link_id));
 
     PyObject *pValue = PyObject_CallObject(self->handler_rx_call, pArgs);
     Py_DECREF(pArgs);
@@ -507,10 +508,10 @@ static PyObject* dx_python_send(PyObject
     field = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, field);
     dx_compose_start_map(field);
 
-    dx_compose_insert_string(field, "qdx.ingress");
+    dx_compose_insert_string(field, DX_DA_INGRESS);
     dx_compose_insert_string(field, dx_router_id(ioa->dx));
 
-    dx_compose_insert_string(field, "qdx.trace");
+    dx_compose_insert_string(field, DX_DA_TRACE);
     dx_compose_start_list(field);
     dx_compose_insert_string(field, dx_router_id(ioa->dx));
     dx_compose_end_list(field);

Modified: qpid/trunk/qpid/extras/dispatch/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_node.c?rev=1526694&r1=1526693&r2=1526694&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_node.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_node.c Thu Sep 26 21:14:59 2013
@@ -21,17 +21,18 @@
 #include <stdio.h>
 #include <string.h>
 #include <stdbool.h>
+#include <stdlib.h>
 #include <qpid/dispatch.h>
 #include "dispatch_private.h"
+#include "router_private.h"
 
 static char *module = "ROUTER";
 
 static void dx_router_python_setup(dx_router_t *router);
 static void dx_pyrouter_tick(dx_router_t *router);
 
-static char *router_address = "_local/qdxrouter";
-static char *local_prefix   = "_local/";
-//static char *topo_prefix    = "_topo/";
+static char *local_prefix = "_local/";
+static char *topo_prefix  = "_topo/";
 
 /**
  * Address Types and Processing:
@@ -48,86 +49,82 @@ static char *local_prefix   = "_local/";
  *   <mobile>                             M<mobile>      forward handler
  */
 
+ALLOC_DEFINE(dx_routed_event_t);
+ALLOC_DEFINE(dx_router_link_t);
+ALLOC_DEFINE(dx_router_node_t);
+ALLOC_DEFINE(dx_router_ref_t);
+ALLOC_DEFINE(dx_router_link_ref_t);
+ALLOC_DEFINE(dx_address_t);
 
-typedef struct dx_router_link_t dx_router_link_t;
-typedef struct dx_router_node_t dx_router_node_t;
 
+static void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
+{
+    dx_router_link_ref_t *ref = new_dx_router_link_ref_t();
+    DEQ_ITEM_INIT(ref);
+    ref->link = link;
+    link->ref = ref;
+    DEQ_INSERT_TAIL(*ref_list, ref);
+}
 
-typedef enum {
-    DX_LINK_ENDPOINT,   // A link to a connected endpoint
-    DX_LINK_ROUTER,     // A link to a peer router in the same area
-    DX_LINK_AREA        // A link to a peer router in a different area (area boundary)
-} dx_link_type_t;
-
-
-typedef struct dx_routed_event_t {
-    DEQ_LINKS(struct dx_routed_event_t);
-    dx_delivery_t *delivery;
-    dx_message_t  *message;
-    bool           settle;
-    uint64_t       disposition;
-} dx_routed_event_t;
 
-ALLOC_DECLARE(dx_routed_event_t);
-ALLOC_DEFINE(dx_routed_event_t);
-DEQ_DECLARE(dx_routed_event_t, dx_routed_event_list_t);
+static void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
+{
+    if (link->ref) {
+        DEQ_REMOVE(*ref_list, link->ref);
+        free_dx_router_link_ref_t(link->ref);
+        link->ref = 0;
+    }
+}
 
 
-struct dx_router_link_t {
-    DEQ_LINKS(dx_router_link_t);
-    dx_direction_t          link_direction;
-    dx_link_type_t          link_type;
-    dx_address_t           *owning_addr;     // [ref] Address record that owns this link
-    dx_link_t              *link;            // [own] Link pointer
-    dx_router_link_t       *connected_link;  // [ref] If this is a link-route, reference the connected link
-    dx_router_link_t       *peer_link;       // [ref] If this is a bidirectional link-route, reference the peer link
-    dx_routed_event_list_t  event_fifo;      // FIFO of outgoing delivery/link events (no messages)
-    dx_routed_event_list_t  msg_fifo;        // FIFO of outgoing message deliveries
-};
+/**
+ * Check an address to see if it no longer has any associated destinations.
+ * Depending on its policy, the address may be eligible for being closed out
+ * (i.e. Logging its terminal statistics and freeing its resources).
+ */
+static void dx_router_check_addr_LH(dx_address_t *addr)
+{
+    // TODO
+}
 
-ALLOC_DECLARE(dx_router_link_t);
-ALLOC_DEFINE(dx_router_link_t);
-DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t);
 
-struct dx_router_node_t {
-    DEQ_LINKS(dx_router_node_t);
-    const char       *id;
-    dx_router_node_t *next_hop;   // Next hop node _if_ this is not a neighbor node
-    dx_router_link_t *peer_link;  // Outgoing link _if_ this is a neighbor node
-    // list of valid origins (pointers to router_node) - (bit masks?)
-};
+/**
+ * Determine whether a terminus has router capability
+ */
+static int dx_router_terminus_is_router(pn_terminus_t *term)
+{
+    pn_data_t *cap = pn_terminus_capabilities(term);
 
-ALLOC_DECLARE(dx_router_node_t);
-ALLOC_DEFINE(dx_router_node_t);
-DEQ_DECLARE(dx_router_node_t, dx_router_node_list_t);
+    if (cap && pn_data_type(cap) == PN_SYMBOL) {
+        pn_bytes_t sym = pn_data_get_symbol(cap);
+        if (sym.size == strlen(DX_CAPABILITY_ROUTER) &&
+            strcmp(sym.start, DX_CAPABILITY_ROUTER) == 0)
+            return 1;
+    }
 
+    return 0;
+}
 
-struct dx_address_t {
-    dx_router_message_cb   handler;          // In-Process Consumer
-    void                  *handler_context;
-    dx_router_link_list_t  rlinks;           // Locally-Connected Consumers
-    dx_router_node_list_t  rnodes;           // Remotely-Connected Consumers
-};
 
-ALLOC_DECLARE(dx_address_t);
-ALLOC_DEFINE(dx_address_t);
+static void dx_router_generate_temp_addr(dx_router_t *router, char *buffer, size_t length)
+{
+    static const char *table = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+_";
+    char     discriminator[11];
+    long int rnd = random();
+    int      idx;
 
+    for (idx = 0; idx < 10; idx++)
+        discriminator[idx] = table[(rnd >> (idx * 6)) & 63];
+    discriminator[idx] = '\0';
+
+    snprintf(buffer, length, "amqp:/%s%s/%s/temp.%s", topo_prefix, router->router_area, router->router_id, discriminator);
+}
 
-struct dx_router_t {
-    dx_dispatch_t         *dx;
-    const char            *router_area;
-    const char            *router_id;
-    dx_node_t             *node;
-    dx_router_link_list_t  in_links;
-    dx_router_node_list_t  routers;
-    dx_message_list_t      in_fifo;
-    sys_mutex_t           *lock;
-    dx_timer_t            *timer;
-    hash_t                *out_hash;
-    uint64_t               dtag;
-    PyObject              *pyRouter;
-    PyObject              *pyTick;
-};
+
+static int dx_router_find_mask_bit(dx_link_t *link)
+{
+    return 0;  // TODO
+}
 
 
 /**
@@ -191,7 +188,7 @@ static int router_writable_link_handler(
         DEQ_REMOVE_HEAD(to_send);
 
         //
-        // Get a delivery for the send.  This will be the current deliver on the link.
+        // Get a delivery for the send.  This will be the current delivery on the link.
         //
         tag++;
         delivery = dx_delivery(link, pn_dtag((char*) &tag, 8));
@@ -259,8 +256,8 @@ static void router_annotate_message(dx_r
     dx_parsed_field_t *ingress = 0;
 
     if (in_da) {
-        trace   = dx_parse_value_by_key(in_da, "qdx.trace");
-        ingress = dx_parse_value_by_key(in_da, "qdx.ingress");
+        trace   = dx_parse_value_by_key(in_da, DX_DA_TRACE);
+        ingress = dx_parse_value_by_key(in_da, DX_DA_INGRESS);
     }
 
     dx_compose_start_map(out_da);
@@ -269,7 +266,7 @@ static void router_annotate_message(dx_r
     // If there is a trace field, append this router's ID to the trace.
     //
     if (trace && dx_parse_is_list(trace)) {
-        dx_compose_insert_string(out_da, "qdx.trace");
+        dx_compose_insert_string(out_da, DX_DA_TRACE);
         dx_compose_start_list(out_da);
 
         uint32_t idx = 0;
@@ -289,7 +286,7 @@ static void router_annotate_message(dx_r
     // If there is no ingress field, annotate the ingress as this router else
     // keep the original field.
     //
-    dx_compose_insert_string(out_da, "qdx.ingress");
+    dx_compose_insert_string(out_da, DX_DA_INGRESS);
     if (ingress && dx_parse_is_scalar(ingress)) {
         dx_field_iterator_t *iter = dx_parse_raw(ingress);
         dx_compose_insert_string_iterator(out_da, iter);
@@ -380,7 +377,7 @@ static void router_rx_handler(void* cont
 
         if (iter) {
             dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
-            hash_retrieve(router->out_hash, iter, (void*) &addr);
+            hash_retrieve(router->addr_hash, iter, (void*) &addr);
             dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
             int is_local = dx_field_iterator_prefix(iter, local_prefix);
             dx_field_iterator_free(iter);
@@ -415,33 +412,34 @@ static void router_rx_handler(void* cont
                     //
                     // Forward to all of the local links receiving this address.
                     //
-                    dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks);
-                    while (dest_link) {
+                    dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks);
+                    while (dest_link_ref) {
                         dx_routed_event_t *re = new_dx_routed_event_t();
                         DEQ_ITEM_INIT(re);
                         re->delivery    = 0;
                         re->message     = dx_message_copy(msg);
                         re->settle      = 0;
                         re->disposition = 0;
-                        DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+                        DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re);
 
                         fanout++;
                         if (fanout == 1 && !dx_delivery_settled(delivery))
                             re->delivery = delivery;
 
-                        dx_link_activate(dest_link->link);
-                        dest_link = DEQ_NEXT(dest_link);
+                        dx_link_activate(dest_link_ref->link->link);
+                        dest_link_ref = DEQ_NEXT(dest_link_ref);
                     }
 
                     //
                     // Forward to the next-hops for remote destinations.
                     //
-                    dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes);
-                    while (dest_node) {
-                        if (dest_node->next_hop)
-                            dest_link = dest_node->next_hop->peer_link;
+                    dx_router_ref_t  *dest_node_ref = DEQ_HEAD(addr->rnodes);
+                    dx_router_link_t *dest_link;
+                    while (dest_node_ref) {
+                        if (dest_node_ref->router->next_hop)
+                            dest_link = dest_node_ref->router->next_hop->peer_link;
                         else
-                            dest_link = dest_node->peer_link;
+                            dest_link = dest_node_ref->router->peer_link;
                         if (dest_link) {
                             dx_routed_event_t *re = new_dx_routed_event_t();
                             DEQ_ITEM_INIT(re);
@@ -457,7 +455,7 @@ static void router_rx_handler(void* cont
 
                             dx_link_activate(dest_link->link);
                         }
-                        dest_node = DEQ_NEXT(dest_node);
+                        dest_node_ref = DEQ_NEXT(dest_node_ref);
                     }
                 }
             }
@@ -487,7 +485,7 @@ static void router_rx_handler(void* cont
     // Invoke the in-process handler now that the lock is released.
     //
     if (handler)
-        handler(handler_context, in_process_copy);
+        handler(handler_context, in_process_copy, rlink->mask_bit);
 }
 
 
@@ -541,25 +539,28 @@ static int router_incoming_link_handler(
     dx_router_t      *router  = (dx_router_t*) context;
     dx_router_link_t *rlink   = new_dx_router_link_t();
     pn_link_t        *pn_link = dx_link_pn(link);
+    int is_router             = dx_router_terminus_is_router(dx_link_remote_source(link));
 
     DEQ_ITEM_INIT(rlink);
+    rlink->mask_bit       = is_router ? dx_router_find_mask_bit(link) : 0;
+    rlink->link_type      = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT;
     rlink->link_direction = DX_INCOMING;
-    rlink->link_type      = DX_LINK_ENDPOINT;
     rlink->owning_addr    = 0;
     rlink->link           = link;
     rlink->connected_link = 0;
     rlink->peer_link      = 0;
+    rlink->ref            = 0;
     DEQ_INIT(rlink->event_fifo);
     DEQ_INIT(rlink->msg_fifo);
 
     dx_link_set_context(link, rlink);
 
     sys_mutex_lock(router->lock);
-    DEQ_INSERT_TAIL(router->in_links, rlink);
+    DEQ_INSERT_TAIL(router->links, rlink);
     sys_mutex_unlock(router->lock);
 
-    pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
-    pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
+    pn_terminus_copy(dx_link_source(link), dx_link_remote_source(link));
+    pn_terminus_copy(dx_link_target(link), dx_link_remote_target(link));
     pn_link_flow(pn_link, 1000);
     pn_link_open(pn_link);
 
@@ -579,52 +580,90 @@ static int router_outgoing_link_handler(
 {
     dx_router_t *router  = (dx_router_t*) context;
     pn_link_t   *pn_link = dx_link_pn(link);
-    const char  *r_tgt   = pn_terminus_get_address(pn_link_remote_target(pn_link));
+    const char  *r_src   = pn_terminus_get_address(dx_link_remote_source(link));
+    int is_dynamic       = pn_terminus_is_dynamic(dx_link_remote_source(link));
+    int is_router        = dx_router_terminus_is_router(dx_link_remote_target(link));
 
-    if (!r_tgt) {
+    //
+    // If this link is not a router link and it has no source address, we can't
+    // accept it.
+    //
+    if (r_src == 0 && !is_router && !is_dynamic) {
         pn_link_close(pn_link);
         return 0;
     }
 
-    dx_field_iterator_t *iter  = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST);
-    dx_router_link_t    *rlink = new_dx_router_link_t();
-
-    int is_router = dx_field_iterator_equal(iter, (unsigned char*) router_address);
-
+    //
+    // Create a router_link record for this link.  Some of the fields will be
+    // modified in the different cases below.
+    //
+    dx_router_link_t *rlink = new_dx_router_link_t();
     DEQ_ITEM_INIT(rlink);
-    rlink->link_direction = DX_OUTGOING;
+    rlink->mask_bit       = is_router ? dx_router_find_mask_bit(link) : 0;
     rlink->link_type      = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT;
+    rlink->link_direction = DX_OUTGOING;
+    rlink->owning_addr    = 0;
     rlink->link           = link;
     rlink->connected_link = 0;
     rlink->peer_link      = 0;
+    rlink->ref            = 0;
     DEQ_INIT(rlink->event_fifo);
     DEQ_INIT(rlink->msg_fifo);
 
     dx_link_set_context(link, rlink);
-
-    dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
-    dx_address_t *addr;
+    pn_terminus_copy(dx_link_source(link), dx_link_remote_source(link));
+    pn_terminus_copy(dx_link_target(link), dx_link_remote_target(link));
 
     sys_mutex_lock(router->lock);
-    hash_retrieve(router->out_hash, iter, (void**) &addr);
-    if (!addr) {
-        addr = new_dx_address_t();
-        addr->handler         = 0;
-        addr->handler_context = 0;
-        DEQ_INIT(addr->rlinks);
-        DEQ_INIT(addr->rnodes);
-        hash_insert(router->out_hash, iter, addr);
+
+    if (is_router) {
+        //
+        // If this is a router link, put it in the router_address link-list.
+        //
+        dx_router_add_link_ref_LH(&router->router_addr->rlinks, rlink);
+        rlink->owning_addr = router->router_addr;
+
+    } else {
+        //
+        // If this is an endpoint link, check the source.  If it is dynamic, we will
+        // assign it an ephemeral and routable address.  If it has a non-dymanic
+        // address, that address needs to be set up in the address list.
+        //
+        dx_field_iterator_t *iter;
+        char                 temp_addr[1000];
+        dx_address_t        *addr;
+
+        if (is_dynamic) {
+            dx_router_generate_temp_addr(router, temp_addr, 1000);
+            iter = dx_field_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH);
+            pn_terminus_set_address(dx_link_source(link), temp_addr);
+            dx_log(module, LOG_INFO, "Assigned temporary routable address: %s", temp_addr);
+        } else {
+            iter = dx_field_iterator_string(r_src, ITER_VIEW_ADDRESS_HASH);
+            dx_log(module, LOG_INFO, "Registered local address: %s", r_src);
+        }
+
+        hash_retrieve(router->addr_hash, iter, (void**) &addr);
+        if (!addr) {
+            addr = new_dx_address_t();
+            DEQ_ITEM_INIT(addr);
+            addr->handler         = 0;
+            addr->handler_context = 0;
+            DEQ_INIT(addr->rlinks);
+            DEQ_INIT(addr->rnodes);
+            hash_insert(router->addr_hash, iter, addr);
+            DEQ_INSERT_TAIL(router->addrs, addr);
+        }
+        dx_field_iterator_free(iter);
+
+        rlink->owning_addr = addr;
+        dx_router_add_link_ref_LH(&addr->rlinks, rlink);
     }
-    dx_field_iterator_free(iter);
 
-    rlink->owning_addr = addr;
-    DEQ_INSERT_TAIL(addr->rlinks, rlink);
+    DEQ_INSERT_TAIL(router->links, rlink);
+    sys_mutex_unlock(router->lock);
 
-    pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
-    pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
     pn_link_open(pn_link);
-    sys_mutex_unlock(router->lock);
-    dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt);
     return 0;
 }
 
@@ -634,40 +673,37 @@ static int router_outgoing_link_handler(
  */
 static int router_link_detach_handler(void* context, dx_link_t *link, int closed)
 {
-    dx_router_t      *router  = (dx_router_t*) context;
-    pn_link_t        *pn_link = dx_link_pn(link);
-    dx_router_link_t *rlink   = (dx_router_link_t*) dx_link_get_context(link);
-    const char       *r_tgt   = pn_terminus_get_address(pn_link_remote_target(pn_link));
+    dx_router_t      *router = (dx_router_t*) context;
+    dx_router_link_t *rlink  = (dx_router_link_t*) dx_link_get_context(link);
 
     if (!rlink)
         return 0;
 
     sys_mutex_lock(router->lock);
-    if (pn_link_is_sender(pn_link)) {
-        DEQ_REMOVE(rlink->owning_addr->rlinks, rlink);
 
-        if ((rlink->owning_addr->handler == 0) &&
-            (DEQ_SIZE(rlink->owning_addr->rlinks) == 0) &&
-            (DEQ_SIZE(rlink->owning_addr->rnodes) == 0)) {
-            dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH);
-            dx_address_t        *addr;
-            if (iter) {
-                hash_retrieve(router->out_hash, iter, (void**) &addr);
-                if (addr == rlink->owning_addr) {
-                    hash_remove(router->out_hash, iter);
-                    free_dx_router_link_t(rlink);
-                    free_dx_address_t(addr);
-                    dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt);
-                }
-                dx_field_iterator_free(iter);
-            }
-        }
-    } else {
-        DEQ_REMOVE(router->in_links, rlink);
-        free_dx_router_link_t(rlink);
+    //
+    // If the link is outgoing, we must disassociate it from its address.
+    //
+    if (rlink->link_direction == DX_OUTGOING && rlink->owning_addr) {
+        dx_router_del_link_ref_LH(&rlink->owning_addr->rlinks, rlink);
+        dx_router_check_addr_LH(rlink->owning_addr);
     }
 
+    //
+    // If this is an incoming inter-router link, we must free the mask_bit.
+    //
+    if (rlink->link_type == DX_LINK_ROUTER && rlink->link_direction == DX_INCOMING)
+        dx_bitmask_set_bit(router->neighbor_free_mask, rlink->mask_bit);
+
+    //
+    // Remove the link from the master list-of-links.
+    //
+    DEQ_REMOVE(router->links, rlink);
     sys_mutex_unlock(router->lock);
+
+    // TODO - wrap the free to handle the recursive items
+    free_dx_router_link_t(rlink);
+
     return 0;
 }
 
@@ -683,24 +719,37 @@ static void router_outbound_open_handler
     //        Ignore otherwise
 
     dx_router_t         *router = (dx_router_t*) type_context;
-    dx_field_iterator_t *aiter  = dx_field_iterator_string(router_address, ITER_VIEW_ADDRESS_HASH);
     dx_link_t           *sender;
     dx_link_t           *receiver;
     dx_router_link_t    *rlink;
+    int                  mask_bit = 0;
+    size_t               clen     = strlen(DX_CAPABILITY_ROUTER);
 
     //
-    // Create an incoming link and put it in the in-links collection.  The address
-    // of the remote source of this link is '_local/qdxrouter'.
+    // Allocate a mask bit to designate the pair of links connected to the neighbor router
     //
-    receiver = dx_link(router->node, conn, DX_INCOMING, "inter-router-rx");
-    pn_terminus_set_address(dx_link_remote_source(receiver), router_address);
-    pn_terminus_set_address(dx_link_target(receiver), router_address);
+    sys_mutex_lock(router->lock);
+    if (dx_bitmask_first_set(router->neighbor_free_mask, &mask_bit)) {
+        dx_bitmask_clear_bit(router->neighbor_free_mask, mask_bit);
+    } else {
+        sys_mutex_unlock(router->lock);
+        dx_log(module, LOG_CRITICAL, "Exceeded maximum inter-router link count");
+        return;
+    }
 
-    rlink = new_dx_router_link_t();
+    //
+    // Create an incoming link with router source capability
+    //
+    receiver = dx_link(router->node, conn, DX_INCOMING, DX_INTERNODE_LINK_NAME_1);
+    // TODO - We don't want to have to cast away the constness of the literal string here!
+    //        See PROTON-429
+    pn_data_put_symbol(pn_terminus_capabilities(dx_link_target(receiver)), pn_bytes(clen, (char*) DX_CAPABILITY_ROUTER));
 
+    rlink = new_dx_router_link_t();
     DEQ_ITEM_INIT(rlink);
-    rlink->link_direction = DX_INCOMING;
+    rlink->mask_bit       = mask_bit;
     rlink->link_type      = DX_LINK_ROUTER;
+    rlink->link_direction = DX_INCOMING;
     rlink->owning_addr    = 0;
     rlink->link           = receiver;
     rlink->connected_link = 0;
@@ -709,53 +758,40 @@ static void router_outbound_open_handler
     DEQ_INIT(rlink->msg_fifo);
 
     dx_link_set_context(receiver, rlink);
-
-    sys_mutex_lock(router->lock);
-    DEQ_INSERT_TAIL(router->in_links, rlink);
-    sys_mutex_unlock(router->lock);
+    DEQ_INSERT_TAIL(router->links, rlink);
 
     //
-    // Create an outgoing link with a local source of '_local/qdxrouter' and place
-    // it in the routing table.
+    // Create an outgoing link with router target capability
     //
-    sender = dx_link(router->node, conn, DX_OUTGOING, "inter-router-tx");
-    pn_terminus_set_address(dx_link_remote_target(sender), router_address);
-    pn_terminus_set_address(dx_link_source(sender), router_address);
+    sender = dx_link(router->node, conn, DX_OUTGOING, DX_INTERNODE_LINK_NAME_2);
+    // TODO - We don't want to have to cast away the constness of the literal string here!
+    //        See PROTON-429
+    pn_data_put_symbol(pn_terminus_capabilities(dx_link_target(receiver)), pn_bytes(clen, (char *) DX_CAPABILITY_ROUTER));
 
     rlink = new_dx_router_link_t();
-
     DEQ_ITEM_INIT(rlink);
-    rlink->link_direction = DX_OUTGOING;
+    rlink->mask_bit       = mask_bit;
     rlink->link_type      = DX_LINK_ROUTER;
+    rlink->link_direction = DX_OUTGOING;
+    rlink->owning_addr    = router->router_addr;
     rlink->link           = sender;
     rlink->connected_link = 0;
     rlink->peer_link      = 0;
     DEQ_INIT(rlink->event_fifo);
     DEQ_INIT(rlink->msg_fifo);
 
-    dx_link_set_context(sender, rlink);
-
-    dx_address_t *addr;
-
-    sys_mutex_lock(router->lock);
-    hash_retrieve(router->out_hash, aiter, (void**) &addr);
-    if (!addr) {
-        addr = new_dx_address_t();
-        addr->handler         = 0;
-        addr->handler_context = 0;
-        DEQ_INIT(addr->rlinks);
-        DEQ_INIT(addr->rnodes);
-        hash_insert(router->out_hash, aiter, addr);
-    }
+    //
+    // Add the new outgoing link to the router_address's list of links.
+    //
+    dx_router_add_link_ref_LH(&router->router_addr->rlinks, rlink);
 
-    rlink->owning_addr = addr;
-    DEQ_INSERT_TAIL(addr->rlinks, rlink);
+    dx_link_set_context(sender, rlink);
+    DEQ_INSERT_TAIL(router->links, rlink);
     sys_mutex_unlock(router->lock);
 
     pn_link_open(dx_link_pn(receiver));
     pn_link_open(dx_link_pn(sender));
     pn_link_flow(dx_link_pn(receiver), 1000);
-    dx_field_iterator_free(aiter);
 }
 
 
@@ -767,7 +803,6 @@ static void dx_router_timer_handler(void
     // Periodic processing.
     //
     dx_pyrouter_tick(router);
-
     dx_timer_schedule(router->timer, 1000);
 }
 
@@ -797,20 +832,29 @@ dx_router_t *dx_router(dx_dispatch_t *dx
 
     router_node.type_context = router;
 
+    dx->router = router;
     router->dx           = dx;
     router->router_area  = area;
     router->router_id    = id;
     router->node         = dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH);
-    DEQ_INIT(router->in_links);
+    DEQ_INIT(router->addrs);
+    router->addr_hash    = hash(10, 32, 0);
+
+    DEQ_INIT(router->links);
     DEQ_INIT(router->routers);
-    DEQ_INIT(router->in_fifo);
-    router->lock         = sys_mutex();
-    router->timer        = dx_timer(dx, dx_router_timer_handler, (void*) router);
-    router->out_hash     = hash(10, 32, 0);
-    router->dtag         = 1;
-    router->pyRouter     = 0;
-    router->pyTick       = 0;
 
+    router->neighbor_free_mask = dx_bitmask(1);
+    router->lock               = sys_mutex();
+    router->timer              = dx_timer(dx, dx_router_timer_handler, (void*) router);
+    router->dtag               = 1;
+    router->pyRouter           = 0;
+    router->pyTick             = 0;
+
+    //
+    // Create an address for all of the routers in the topology.  It will be registered
+    // locally later in the initialization sequence.
+    //
+    router->router_addr = dx_router_register_address(dx, "qdxrouter", 0, 0);
 
     //
     // Inform the field iterator module of this router's id and area.  The field iterator
@@ -824,7 +868,6 @@ dx_router_t *dx_router(dx_dispatch_t *dx
     dx_python_start();
 
     dx_log(module, LOG_INFO, "Router started, area=%s id=%s", area, id);
-
     return router;
 }
 
@@ -869,14 +912,17 @@ dx_address_t *dx_router_register_address
     iter = dx_field_iterator_string(addr_string, ITER_VIEW_NO_HOST);
 
     sys_mutex_lock(router->lock);
-    hash_retrieve(router->out_hash, iter, (void**) &addr);
+    hash_retrieve(router->addr_hash, iter, (void**) &addr);
     if (!addr) {
         addr = new_dx_address_t();
+        DEQ_ITEM_INIT(addr);
         addr->handler         = 0;
         addr->handler_context = 0;
         DEQ_INIT(addr->rlinks);
         DEQ_INIT(addr->rnodes);
-        hash_insert(router->out_hash, iter, addr);
+        hash_insert(router->addr_hash, iter, addr);
+        DEQ_ITEM_INIT(addr);
+        DEQ_INSERT_TAIL(router->addrs, addr);
     }
     dx_field_iterator_free(iter);
 
@@ -885,7 +931,8 @@ dx_address_t *dx_router_register_address
 
     sys_mutex_unlock(router->lock);
 
-    dx_log(module, LOG_TRACE, "In-Process Address Registered: %s", address);
+    if (handler)
+        dx_log(module, LOG_INFO, "In-Process Address Registered: %s", address);
     return addr;
 }
 
@@ -905,34 +952,35 @@ void dx_router_send(dx_dispatch_t       
 
     dx_field_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH);
     sys_mutex_lock(router->lock);
-    hash_retrieve(router->out_hash, address, (void*) &addr);
+    hash_retrieve(router->addr_hash, address, (void*) &addr);
     if (addr) {
         //
         // Forward to all of the local links receiving this address.
         //
-        dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks);
-        while (dest_link) {
+        dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks);
+        while (dest_link_ref) {
             dx_routed_event_t *re = new_dx_routed_event_t();
             DEQ_ITEM_INIT(re);
             re->delivery    = 0;
             re->message     = dx_message_copy(msg);
             re->settle      = 0;
             re->disposition = 0;
-            DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+            DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re);
 
-            dx_link_activate(dest_link->link);
-            dest_link = DEQ_NEXT(dest_link);
+            dx_link_activate(dest_link_ref->link->link);
+            dest_link_ref = DEQ_NEXT(dest_link_ref);
         }
 
         //
         // Forward to the next-hops for remote destinations.
         //
-        dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes);
-        while (dest_node) {
-            if (dest_node->next_hop)
-                dest_link = dest_node->next_hop->peer_link;
+        dx_router_ref_t  *dest_node_ref = DEQ_HEAD(addr->rnodes);
+        dx_router_link_t *dest_link;
+        while (dest_node_ref) {
+            if (dest_node_ref->router->next_hop)
+                dest_link = dest_node_ref->router->next_hop->peer_link;
             else
-                dest_link = dest_node->peer_link;
+                dest_link = dest_node_ref->router->peer_link;
             if (dest_link) {
                 dx_routed_event_t *re = new_dx_routed_event_t();
                 DEQ_ITEM_INIT(re);
@@ -943,7 +991,7 @@ void dx_router_send(dx_dispatch_t       
                 DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
                 dx_link_activate(dest_link->link);
             }
-            dest_node = DEQ_NEXT(dest_node);
+            dest_node_ref = DEQ_NEXT(dest_node_ref);
         }
     }
     sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher?
@@ -977,8 +1025,11 @@ static PyObject* dx_router_node_updated(
     const char    *address;
     int            is_reachable;
     int            is_neighbor;
+    int            link_maskbit;
+    int            router_maskbit;
 
-    if (!PyArg_ParseTuple(args, "sii", &address, &is_reachable, &is_neighbor))
+    if (!PyArg_ParseTuple(args, "siiii", &address, &is_reachable, &is_neighbor,
+                          &link_maskbit, &router_maskbit))
         return 0;
 
     // TODO
@@ -1099,6 +1150,7 @@ static void dx_router_python_setup(dx_ro
     PyObject* pName;
     PyObject* pId;
     PyObject* pArea;
+    PyObject* pMaxRouters;
     PyObject* pModule;
     PyObject* pClass;
     PyObject* pArgs;
@@ -1126,7 +1178,7 @@ static void dx_router_python_setup(dx_ro
     //
     // Constructor Arguments for RouterEngine
     //
-    pArgs = PyTuple_New(3);
+    pArgs = PyTuple_New(4);
 
     // arg 0: adapter instance
     PyTuple_SetItem(pArgs, 0, adapterInstance);
@@ -1135,10 +1187,14 @@ static void dx_router_python_setup(dx_ro
     pId = PyString_FromString(router->router_id);
     PyTuple_SetItem(pArgs, 1, pId);
 
-    // arg 2: area id
+    // arg 2: area_id
     pArea = PyString_FromString(router->router_area);
     PyTuple_SetItem(pArgs, 2, pArea);
 
+    // arg 3: max_routers
+    pMaxRouters = PyInt_FromLong((long) dx_bitmask_width());
+    PyTuple_SetItem(pArgs, 3, pMaxRouters);
+
     //
     // Instantiate the router
     //

Added: qpid/trunk/qpid/extras/dispatch/src/router_private.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_private.h?rev=1526694&view=auto
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_private.h (added)
+++ qpid/trunk/qpid/extras/dispatch/src/router_private.h Thu Sep 26 21:14:59 2013
@@ -0,0 +1,128 @@
+#ifndef __router_private_h__
+#define __router_private_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+typedef struct dx_router_link_t     dx_router_link_t;
+typedef struct dx_router_node_t     dx_router_node_t;
+typedef struct dx_router_ref_t      dx_router_ref_t;
+typedef struct dx_router_link_ref_t dx_router_link_ref_t;
+
+
+typedef enum {
+    DX_LINK_ENDPOINT,   // A link to a connected endpoint
+    DX_LINK_ROUTER,     // A link to a peer router in the same area
+    DX_LINK_AREA        // A link to a peer router in a different area (area boundary)
+} dx_link_type_t;
+
+
+typedef struct dx_routed_event_t {
+    DEQ_LINKS(struct dx_routed_event_t);
+    dx_delivery_t *delivery;
+    dx_message_t  *message;
+    bool           settle;
+    uint64_t       disposition;
+} dx_routed_event_t;
+
+ALLOC_DECLARE(dx_routed_event_t);
+DEQ_DECLARE(dx_routed_event_t, dx_routed_event_list_t);
+
+
+struct dx_router_link_t {
+    DEQ_LINKS(dx_router_link_t);
+    int                     mask_bit;        // Unique mask bit if this is an inter-router link
+    dx_link_type_t          link_type;
+    dx_direction_t          link_direction;
+    dx_address_t           *owning_addr;     // [ref] Address record that owns this link
+    dx_link_t              *link;            // [own] Link pointer
+    dx_router_link_t       *connected_link;  // [ref] If this is a link-route, reference the connected link
+    dx_router_link_t       *peer_link;       // [ref] If this is a bidirectional link-route, reference the peer link
+    dx_router_link_ref_t   *ref;             // Pointer to a containing reference object
+    dx_routed_event_list_t  event_fifo;      // FIFO of outgoing delivery/link events (no messages)
+    dx_routed_event_list_t  msg_fifo;        // FIFO of outgoing message deliveries
+};
+
+ALLOC_DECLARE(dx_router_link_t);
+DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t);
+
+struct dx_router_node_t {
+    DEQ_LINKS(dx_router_node_t);
+    const char       *id;
+    int               mask_bit;
+    dx_router_node_t *next_hop;   // Next hop node _if_ this is not a neighbor node
+    dx_router_link_t *peer_link;  // Outgoing link _if_ this is a neighbor node
+    dx_bitmask_t     *valid_origins;
+};
+
+ALLOC_DECLARE(dx_router_node_t);
+DEQ_DECLARE(dx_router_node_t, dx_router_node_list_t);
+
+struct dx_router_ref_t {
+    DEQ_LINKS(dx_router_ref_t);
+    dx_router_node_t *router;
+};
+
+ALLOC_DECLARE(dx_router_ref_t);
+DEQ_DECLARE(dx_router_ref_t, dx_router_ref_list_t);
+
+
+struct dx_router_link_ref_t {
+    DEQ_LINKS(dx_router_link_ref_t);
+    dx_router_link_t *link;
+};
+
+ALLOC_DECLARE(dx_router_link_ref_t);
+DEQ_DECLARE(dx_router_link_ref_t, dx_router_link_ref_list_t);
+
+
+struct dx_address_t {
+    DEQ_LINKS(dx_address_t);
+    dx_router_message_cb       handler;          // In-Process Consumer
+    void                      *handler_context;  // In-Process Consumer context
+    dx_router_link_ref_list_t  rlinks;           // Locally-Connected Consumers
+    dx_router_ref_list_t       rnodes;           // Remotely-Connected Consumers
+};
+
+ALLOC_DECLARE(dx_address_t);
+DEQ_DECLARE(dx_address_t, dx_address_list_t);
+
+
+struct dx_router_t {
+    dx_dispatch_t         *dx;
+    const char            *router_area;
+    const char            *router_id;
+    dx_node_t             *node;
+
+    dx_address_list_t      addrs;
+    hash_t                *addr_hash;
+    dx_address_t          *router_addr;
+
+    dx_router_link_list_t  links;
+    dx_router_node_list_t  routers;
+
+    dx_bitmask_t          *neighbor_free_mask;
+    sys_mutex_t           *lock;
+    dx_timer_t            *timer;
+    uint64_t               dtag;
+
+    PyObject              *pyRouter;
+    PyObject              *pyTick;
+};
+
+#endif

Propchange: qpid/trunk/qpid/extras/dispatch/src/router_private.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/extras/dispatch/src/router_private.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Rev URL



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message