Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4E9CE1036A for ; Thu, 26 Sep 2013 21:15:35 +0000 (UTC) Received: (qmail 68442 invoked by uid 500); 26 Sep 2013 21:15:35 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 68419 invoked by uid 500); 26 Sep 2013 21:15:34 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 68412 invoked by uid 99); 26 Sep 2013 21:15:34 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Sep 2013 21:15:34 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Sep 2013 21:15:24 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id E9BA92388980; Thu, 26 Sep 2013 21:15:00 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1526694 [2/3] - in /qpid/trunk/qpid/extras/dispatch: ./ include/qpid/ include/qpid/dispatch/ python/qpid/dispatch/router/ router/src/ src/ tests/ Date: Thu, 26 Sep 2013 21:14:59 -0000 To: commits@qpid.apache.org From: tross@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130926211500.E9BA92388980@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py?rev=1526694&r1=1526693&r2=1526694&view=diff ============================================================================== --- qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py (original) +++ qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py Thu Sep 26 21:14:59 2013 @@ -36,245 +36,249 @@ from node import NodeTracker ## (i.e. we are in a test bench, etc.), load the stub versions. ## try: - from dispatch import * + from dispatch import * except ImportError: - from ..stubs import * + from ..stubs import * class RouterEngine: - """ - """ - - def __init__(self, router_adapter, router_id=None, area='area', config_override={}): - """ - Initialize an instance of a router for a domain. - """ - ## - ## Record important information about this router instance - ## - self.domain = "domain" - self.router_adapter = router_adapter - self.log_adapter = LogAdapter("dispatch.router") - self.io_adapter = IoAdapter(self, "qdxrouter") - - if router_id: - self.id = router_id - else: - self.id = str(uuid4()) - self.area = area - self.log(LOG_INFO, "Router Engine Instantiated: area=%s id=%s" % (self.area, self.id)) - - ## - ## Setup configuration - ## - self.config = Configuration(config_override) - self.log(LOG_INFO, "Config: %r" % self.config) - - ## - ## Launch the sub-module engines - ## - self.neighbor_engine = NeighborEngine(self) - self.link_state_engine = LinkStateEngine(self) - self.path_engine = PathEngine(self) - self.mobile_address_engine = MobileAddressEngine(self) - self.routing_table_engine = RoutingTableEngine(self) - self.binding_engine = BindingEngine(self) - self.adapter_engine = AdapterEngine(self) - self.node_tracker = NodeTracker(self) - - - - ##======================================================================================== - ## Adapter Entry Points - invoked from the adapter - ##======================================================================================== - def getId(self): - """ - Return the router's ID - """ - return self.id - - - def addLocalAddress(self, key): - """ - """ - try: - if key.find('_topo') == 0 or key.find('_local') == 0: - return - self.mobile_address_engine.add_local_address(key) - except Exception, e: - self.log(LOG_ERROR, "Exception in new-address processing: exception=%r" % e) - - def delLocalAddress(self, key): - """ - """ - try: - if key.find('_topo') == 0 or key.find('_local') == 0: - return - self.mobile_address_engine.del_local_address(key) - except Exception, e: - self.log(LOG_ERROR, "Exception in del-address processing: exception=%r" % e) - - - def handleTimerTick(self): - """ - """ - try: - now = time() - self.neighbor_engine.tick(now) - self.link_state_engine.tick(now) - self.path_engine.tick(now) - self.mobile_address_engine.tick(now) - self.routing_table_engine.tick(now) - self.binding_engine.tick(now) - self.adapter_engine.tick(now) - self.node_tracker.tick(now) - except Exception, e: - self.log(LOG_ERROR, "Exception in timer processing: exception=%r" % e) - - - def handleControlMessage(self, opcode, body): """ """ - try: - now = time() - if opcode == 'HELLO': - msg = MessageHELLO(body) - self.log(LOG_TRACE, "RCVD: %r" % msg) - self.neighbor_engine.handle_hello(msg, now) - - elif opcode == 'RA': - msg = MessageRA(body) - self.log(LOG_TRACE, "RCVD: %r" % msg) - self.link_state_engine.handle_ra(msg, now) - self.mobile_address_engine.handle_ra(msg, now) - - elif opcode == 'LSU': - msg = MessageLSU(body) - self.log(LOG_TRACE, "RCVD: %r" % msg) - self.link_state_engine.handle_lsu(msg, now) - - elif opcode == 'LSR': - msg = MessageLSR(body) - self.log(LOG_TRACE, "RCVD: %r" % msg) - self.link_state_engine.handle_lsr(msg, now) - - elif opcode == 'MAU': - msg = MessageMAU(body) - self.log(LOG_TRACE, "RCVD: %r" % msg) - self.mobile_address_engine.handle_mau(msg, now) - - elif opcode == 'MAR': - msg = MessageMAR(body) - self.log(LOG_TRACE, "RCVD: %r" % msg) - self.mobile_address_engine.handle_mar(msg, now) - - except Exception, e: - self.log(LOG_ERROR, "Exception in message processing: opcode=%s body=%r exception=%r" % (opcode, body, e)) - - - def receive(self, message_properties, body): - """ - This is the IoAdapter message-receive handler - """ - try: - self.handleControlMessage(message_properties['opcode'], body) - except Exception, e: - self.log(LOG_ERROR, "Exception in raw message processing: properties=%r body=%r exception=%r" % - (message_properties, body, e)) - - def getRouterData(self, kind): - """ - """ - if kind == 'help': - return { 'help' : "Get list of supported values for kind", - 'link-state' : "This router's link state", - 'link-state-set' : "The set of link states from known routers", - 'next-hops' : "Next hops to each known router", - 'topo-table' : "Topological routing table", - 'mobile-table' : "Mobile key routing table" - } - if kind == 'link-state' : return self.neighbor_engine.link_state.to_dict() - if kind == 'next-hops' : return self.routing_table_engine.next_hops - if kind == 'topo-table' : return {'table': self.adapter_engine.key_classes['topological']} - if kind == 'mobile-table' : return {'table': self.adapter_engine.key_classes['mobile-key']} - if kind == 'link-state-set' : - copy = {} - for _id,_ls in self.link_state_engine.collection.items(): - copy[_id] = _ls.to_dict() - return copy - - return {'notice':'Use kind="help" to get a list of possibilities'} - - - ##======================================================================================== - ## Adapter Calls - outbound calls to Dispatch - ##======================================================================================== - def log(self, level, text): - """ - Emit a log message to the host's event log - """ - self.log_adapter.log(level, text) - - - def send(self, dest, msg): - """ - Send a control message to another router. - """ - app_props = {'opcode' : msg.get_opcode() } - self.io_adapter.send(dest, app_props, msg.to_dict()) - self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest)) - - - def node_updated(self, addr, reachable, neighbor): - """ - """ - self.router_adapter(addr, reachable, neighbor) - - - ##======================================================================================== - ## Interconnect between the Sub-Modules - ##======================================================================================== - def local_link_state_changed(self, link_state): - self.log(LOG_DEBUG, "Event: local_link_state_changed: %r" % link_state) - self.link_state_engine.new_local_link_state(link_state) - - def ls_collection_changed(self, collection): - self.log(LOG_DEBUG, "Event: ls_collection_changed: %r" % collection) - self.path_engine.ls_collection_changed(collection) - - def next_hops_changed(self, next_hop_table): - self.log(LOG_DEBUG, "Event: next_hops_changed: %r" % next_hop_table) - self.routing_table_engine.next_hops_changed(next_hop_table) - self.binding_engine.next_hops_changed() - - def mobile_sequence_changed(self, mobile_seq): - self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq) - self.link_state_engine.set_mobile_sequence(mobile_seq) - - def mobile_keys_changed(self, keys): - self.log(LOG_DEBUG, "Event: mobile_keys_changed: %r" % keys) - self.binding_engine.mobile_keys_changed(keys) - - def get_next_hops(self): - return self.routing_table_engine.get_next_hops() - - def remote_routes_changed(self, key_class, routes): - self.log(LOG_DEBUG, "Event: remote_routes_changed: class=%s routes=%r" % (key_class, routes)) - self.adapter_engine.remote_routes_changed(key_class, routes) - - def new_neighbor(self, rid): - self.log(LOG_DEBUG, "Event: new_neighbor: id=%s" % rid) - self.node_tracker.new_neighbor(rid) - - def lost_neighbor(self, rid): - self.log(LOG_DEBUG, "Event: lost_neighbor: id=%s" % rid) - self.node_tracker.lost_neighbor(rid) - - def new_node(self, rid): - self.log(LOG_DEBUG, "Event: new_node: id=%s" % rid) - self.node_tracker.new_node(rid) - - def lost_node(self, rid): - self.log(LOG_DEBUG, "Event: lost_node: id=%s" % rid) - self.node_tracker.lost_node(rid) + def __init__(self, router_adapter, router_id, area, max_routers, config_override={}): + """ + Initialize an instance of a router for a domain. + """ + ## + ## Record important information about this router instance + ## + self.domain = "domain" + self.router_adapter = router_adapter + self.log_adapter = LogAdapter("dispatch.router") + self.io_adapter = IoAdapter(self, "qdxrouter") + self.max_routers = max_routers + self.id = router_id + self.area = area + self.log(LOG_INFO, "Router Engine Instantiated: area=%s id=%s max_routers=%d" % + (self.area, self.id, self.max_routers)) + + ## + ## Setup configuration + ## + self.config = Configuration(config_override) + self.log(LOG_INFO, "Config: %r" % self.config) + + ## + ## Launch the sub-module engines + ## + self.neighbor_engine = NeighborEngine(self) + self.link_state_engine = LinkStateEngine(self) + self.path_engine = PathEngine(self) + self.mobile_address_engine = MobileAddressEngine(self) + self.routing_table_engine = RoutingTableEngine(self) + self.binding_engine = BindingEngine(self) + self.adapter_engine = AdapterEngine(self) + self.node_tracker = NodeTracker(self, self.max_routers) + + + + ##======================================================================================== + ## Adapter Entry Points - invoked from the adapter + ##======================================================================================== + def getId(self): + """ + Return the router's ID + """ + return self.id + + + def addLocalAddress(self, key): + """ + """ + try: + if key.find('_topo') == 0 or key.find('_local') == 0: + return + self.mobile_address_engine.add_local_address(key) + except Exception, e: + self.log(LOG_ERROR, "Exception in new-address processing: exception=%r" % e) + + + def delLocalAddress(self, key): + """ + """ + try: + if key.find('_topo') == 0 or key.find('_local') == 0: + return + self.mobile_address_engine.del_local_address(key) + except Exception, e: + self.log(LOG_ERROR, "Exception in del-address processing: exception=%r" % e) + + + def handleTimerTick(self): + """ + """ + try: + now = time() + self.neighbor_engine.tick(now) + self.link_state_engine.tick(now) + self.path_engine.tick(now) + self.mobile_address_engine.tick(now) + self.routing_table_engine.tick(now) + self.binding_engine.tick(now) + self.adapter_engine.tick(now) + self.node_tracker.tick(now) + except Exception, e: + self.log(LOG_ERROR, "Exception in timer processing: exception=%r" % e) + + + def handleControlMessage(self, opcode, body, link_id): + """ + """ + try: + now = time() + if opcode == 'HELLO': + msg = MessageHELLO(body) + self.log(LOG_TRACE, "RCVD: %r" % msg) + self.neighbor_engine.handle_hello(msg, now, link_id) + + elif opcode == 'RA': + msg = MessageRA(body) + self.log(LOG_TRACE, "RCVD: %r" % msg) + self.link_state_engine.handle_ra(msg, now) + self.mobile_address_engine.handle_ra(msg, now) + + elif opcode == 'LSU': + msg = MessageLSU(body) + self.log(LOG_TRACE, "RCVD: %r" % msg) + self.link_state_engine.handle_lsu(msg, now) + + elif opcode == 'LSR': + msg = MessageLSR(body) + self.log(LOG_TRACE, "RCVD: %r" % msg) + self.link_state_engine.handle_lsr(msg, now) + + elif opcode == 'MAU': + msg = MessageMAU(body) + self.log(LOG_TRACE, "RCVD: %r" % msg) + self.mobile_address_engine.handle_mau(msg, now) + + elif opcode == 'MAR': + msg = MessageMAR(body) + self.log(LOG_TRACE, "RCVD: %r" % msg) + self.mobile_address_engine.handle_mar(msg, now) + + except Exception, e: + self.log(LOG_ERROR, "Exception in message processing: opcode=%s body=%r exception=%r" % (opcode, body, e)) + + + def receive(self, message_properties, body, link_id): + """ + This is the IoAdapter message-receive handler + """ + try: + self.handleControlMessage(message_properties['opcode'], body, link_id) + except Exception, e: + self.log(LOG_ERROR, "Exception in raw message processing: properties=%r body=%r exception=%r" % + (message_properties, body, e)) + + + def getRouterData(self, kind): + """ + """ + if kind == 'help': + return { 'help' : "Get list of supported values for kind", + 'link-state' : "This router's link state", + 'link-state-set' : "The set of link states from known routers", + 'next-hops' : "Next hops to each known router", + 'topo-table' : "Topological routing table", + 'mobile-table' : "Mobile key routing table" + } + if kind == 'link-state' : return self.neighbor_engine.link_state.to_dict() + if kind == 'next-hops' : return self.routing_table_engine.next_hops + if kind == 'topo-table' : return {'table': self.adapter_engine.key_classes['topological']} + if kind == 'mobile-table' : return {'table': self.adapter_engine.key_classes['mobile-key']} + if kind == 'link-state-set' : + copy = {} + for _id,_ls in self.link_state_engine.collection.items(): + copy[_id] = _ls.to_dict() + return copy + + return {'notice':'Use kind="help" to get a list of possibilities'} + + + ##======================================================================================== + ## Adapter Calls - outbound calls to Dispatch + ##======================================================================================== + def log(self, level, text): + """ + Emit a log message to the host's event log + """ + self.log_adapter.log(level, text) + + + def send(self, dest, msg): + """ + Send a control message to another router. + """ + app_props = {'opcode' : msg.get_opcode() } + self.io_adapter.send(dest, app_props, msg.to_dict()) + self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest)) + + + def node_updated(self, addr, reachable, neighbor): + """ + """ + self.router_adapter(addr, reachable, neighbor) + + + ##======================================================================================== + ## Interconnect between the Sub-Modules + ##======================================================================================== + def local_link_state_changed(self, link_state): + self.log(LOG_DEBUG, "Event: local_link_state_changed: %r" % link_state) + self.link_state_engine.new_local_link_state(link_state) + + def ls_collection_changed(self, collection): + self.log(LOG_DEBUG, "Event: ls_collection_changed: %r" % collection) + self.path_engine.ls_collection_changed(collection) + + def next_hops_changed(self, next_hop_table): + self.log(LOG_DEBUG, "Event: next_hops_changed: %r" % next_hop_table) + self.routing_table_engine.next_hops_changed(next_hop_table) + self.binding_engine.next_hops_changed() + + def mobile_sequence_changed(self, mobile_seq): + self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq) + self.link_state_engine.set_mobile_sequence(mobile_seq) + + def mobile_keys_changed(self, keys): + self.log(LOG_DEBUG, "Event: mobile_keys_changed: %r" % keys) + self.binding_engine.mobile_keys_changed(keys) + + def get_next_hops(self): + return self.routing_table_engine.get_next_hops() + + def remote_routes_changed(self, key_class, routes): + self.log(LOG_DEBUG, "Event: remote_routes_changed: class=%s routes=%r" % (key_class, routes)) + self.adapter_engine.remote_routes_changed(key_class, routes) + + def new_neighbor(self, rid, link_id): + self.log(LOG_DEBUG, "Event: new_neighbor: id=%s link_id=%d" % (rid, link_id)) + self.node_tracker.new_neighbor(rid, link_id) + + def lost_neighbor(self, rid): + self.log(LOG_DEBUG, "Event: lost_neighbor: id=%s" % rid) + self.node_tracker.lost_neighbor(rid) + + def new_node(self, rid): + self.log(LOG_DEBUG, "Event: new_node: id=%s" % rid) + self.node_tracker.new_node(rid) + + def lost_node(self, rid): + self.log(LOG_DEBUG, "Event: lost_node: id=%s" % rid) + self.node_tracker.lost_node(rid) + + def node_updated(self, address, reachable, neighbor, link_bit, router_bit): + self.log(LOG_DEBUG, "Event: node_updated: address=%s, reachable=%r, neighbor=%r, link_bit=%d, router_bit=%d" % \ + (address, reachable, neighbor, link_bit, router_bit)) + self.router_adapter.node_updataed(address, reachable, neighbor, link_bit, router_bit) Modified: qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py?rev=1526694&r1=1526693&r2=1526694&view=diff ============================================================================== --- qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py (original) +++ qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py Thu Sep 26 21:14:59 2013 @@ -18,39 +18,39 @@ # try: - from dispatch import * + from dispatch import * except ImportError: - from ..stubs import * + from ..stubs import * class RoutingTableEngine(object): - """ - This module is responsible for converting the set of next hops to remote routers to a routing - table in the "topological" address class. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.next_hops = {} - - - def tick(self, now): - pass - - - def next_hops_changed(self, next_hops): - # Convert next_hops into routing table - self.next_hops = next_hops - new_table = [] - for _id, next_hop in next_hops.items(): - new_table.append(('_topo.%s.%s.#' % (self.area, _id), next_hop)) - pair = ('_topo.%s.all' % (self.area), next_hop) - if new_table.count(pair) == 0: - new_table.append(pair) + """ + This module is responsible for converting the set of next hops to remote routers to a routing + table in the "topological" address class. + """ + def __init__(self, container): + self.container = container + self.id = self.container.id + self.area = self.container.area + self.next_hops = {} + + + def tick(self, now): + pass + + + def next_hops_changed(self, next_hops): + # Convert next_hops into routing table + self.next_hops = next_hops + new_table = [] + for _id, next_hop in next_hops.items(): + new_table.append(('_topo.%s.%s.#' % (self.area, _id), next_hop)) + pair = ('_topo.%s.all' % (self.area), next_hop) + if new_table.count(pair) == 0: + new_table.append(pair) - self.container.remote_routes_changed('topological', new_table) + self.container.remote_routes_changed('topological', new_table) - def get_next_hops(self): - return self.next_hops + def get_next_hops(self): + return self.next_hops Modified: qpid/trunk/qpid/extras/dispatch/router/src/main.c URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/router/src/main.c?rev=1526694&r1=1526693&r2=1526694&view=diff ============================================================================== --- qpid/trunk/qpid/extras/dispatch/router/src/main.c (original) +++ qpid/trunk/qpid/extras/dispatch/router/src/main.c Thu Sep 26 21:14:59 2013 @@ -117,7 +117,7 @@ int main(int argc, char **argv) } } - dx_log_set_mask(0xFFFFFFFF); + dx_log_set_mask(0xFFFFFFFE); dispatch = dx_dispatch(config_path); Modified: qpid/trunk/qpid/extras/dispatch/src/agent.c URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/agent.c?rev=1526694&r1=1526693&r2=1526694&view=diff ============================================================================== --- qpid/trunk/qpid/extras/dispatch/src/agent.c (original) +++ qpid/trunk/qpid/extras/dispatch/src/agent.c Thu Sep 26 21:14:59 2013 @@ -240,7 +240,7 @@ static void dx_agent_deferred_handler(vo } -static void dx_agent_rx_handler(void *context, dx_message_t *msg) +static void dx_agent_rx_handler(void *context, dx_message_t *msg, int unused_link_id) { dx_agent_t *agent = (dx_agent_t*) context; dx_message_t *copy = dx_message_copy(msg); Added: qpid/trunk/qpid/extras/dispatch/src/amqp.c URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/amqp.c?rev=1526694&view=auto ============================================================================== --- qpid/trunk/qpid/extras/dispatch/src/amqp.c (added) +++ qpid/trunk/qpid/extras/dispatch/src/amqp.c Thu Sep 26 21:14:59 2013 @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +const char * const DX_DA_INGRESS = "qdx.ingress"; +const char * const DX_DA_TRACE = "qdx.trace"; +const char * const DX_DA_TO = "qdx.to"; + +const char * const DX_CAPABILITY_ROUTER = "qdx.router"; + +const char * const DX_INTERNODE_LINK_NAME_1 = "qdx.internode.1"; +const char * const DX_INTERNODE_LINK_NAME_2 = "qdx.internode.2"; + Propchange: qpid/trunk/qpid/extras/dispatch/src/amqp.c ------------------------------------------------------------------------------ svn:eol-style = native Added: qpid/trunk/qpid/extras/dispatch/src/bitmask.c URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/bitmask.c?rev=1526694&view=auto ============================================================================== --- qpid/trunk/qpid/extras/dispatch/src/bitmask.c (added) +++ qpid/trunk/qpid/extras/dispatch/src/bitmask.c Thu Sep 26 21:14:59 2013 @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include + +#define DX_BITMASK_LONGS 16 +#define DX_BITMASK_BITS (DX_BITMASK_LONGS * 64) + +struct dx_bitmask_t { + uint64_t array[DX_BITMASK_LONGS]; + int first_set; +}; + +ALLOC_DECLARE(dx_bitmask_t); +ALLOC_DEFINE(dx_bitmask_t); + +#define MASK_INDEX(num) (num / 64) +#define MASK_ONEHOT(num) (1 << (num % 64)) +#define FIRST_NONE -1 +#define FIRST_UNKNOWN -2 + + +int dx_bitmask_width() +{ + return DX_BITMASK_BITS; +} + + +dx_bitmask_t *dx_bitmask(int initial) +{ + dx_bitmask_t *b = new_dx_bitmask_t(); + if (initial) + dx_bitmask_set_all(b); + else + dx_bitmask_clear_all(b); + return b; +} + + +void dx_bitmask_free(dx_bitmask_t *b) +{ + free_dx_bitmask_t(b); +} + + +void dx_bitmask_set_all(dx_bitmask_t *b) +{ + for (int i = 0; i < DX_BITMASK_LONGS; i++) + b->array[i] = 0xFFFFFFFFFFFFFFFF; + b->first_set = 0; +} + + +void dx_bitmask_clear_all(dx_bitmask_t *b) +{ + for (int i = 0; i < DX_BITMASK_LONGS; i++) + b->array[i] = 0; + b->first_set = FIRST_NONE; +} + + +void dx_bitmask_set_bit(dx_bitmask_t *b, int bitnum) +{ + assert(bitnum < DX_BITMASK_BITS); + b->array[MASK_INDEX(bitnum)] |= MASK_ONEHOT(bitnum); + if (b->first_set > bitnum) + b->first_set = bitnum; +} + + +void dx_bitmask_clear_bit(dx_bitmask_t *b, int bitnum) +{ + assert(bitnum < DX_BITMASK_BITS); + b->array[MASK_INDEX(bitnum)] &= ~(MASK_ONEHOT(bitnum)); + if (b->first_set == bitnum) + b->first_set = FIRST_UNKNOWN; +} + + +int dx_bitmask_value(dx_bitmask_t *b, int bitnum) +{ + return (b->array[MASK_INDEX(bitnum)] & MASK_ONEHOT(bitnum)) ? 1 : 0; +} + + +int dx_bitmask_first_set(dx_bitmask_t *b, int *bitnum) +{ + if (b->first_set == FIRST_UNKNOWN) { + b->first_set = FIRST_NONE; + for (int i = 0; i < DX_BITMASK_LONGS; i++) + if (b->array[i]) { + for (int j = 0; j < 64; j++) + if ((1 << j) & b->array[i]) { + b->first_set = i * 64 + j; + break; + } + break; + } + } + + if (b->first_set == FIRST_NONE) + return 0; + *bitnum = b->first_set; + return 1; +} + Propchange: qpid/trunk/qpid/extras/dispatch/src/bitmask.c ------------------------------------------------------------------------------ svn:eol-style = native Modified: qpid/trunk/qpid/extras/dispatch/src/message_private.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/message_private.h?rev=1526694&r1=1526693&r2=1526694&view=diff ============================================================================== --- qpid/trunk/qpid/extras/dispatch/src/message_private.h (original) +++ qpid/trunk/qpid/extras/dispatch/src/message_private.h Thu Sep 26 21:14:59 2013 @@ -67,7 +67,7 @@ typedef struct { sys_mutex_t *lock; uint32_t ref_count; // The number of messages referencing this dx_buffer_list_t buffers; // The buffer chain containing the message - dx_buffer_list_t new_delivery_annotations; // The buffer chain containing the new delivery annotations + dx_buffer_list_t new_delivery_annotations; // The buffer chain containing the new delivery annotations (MOVE TO MSG_PVT) dx_field_location_t section_message_header; // The message header list dx_field_location_t section_delivery_annotation; // The delivery annotation map dx_field_location_t section_message_annotation; // The message annotation map Modified: qpid/trunk/qpid/extras/dispatch/src/python_embedded.c URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/python_embedded.c?rev=1526694&r1=1526693&r2=1526694&view=diff ============================================================================== --- qpid/trunk/qpid/extras/dispatch/src/python_embedded.c (original) +++ qpid/trunk/qpid/extras/dispatch/src/python_embedded.c Thu Sep 26 21:14:59 2013 @@ -402,7 +402,7 @@ typedef struct { } IoAdapter; -static void dx_io_rx_handler(void *context, dx_message_t *msg) +static void dx_io_rx_handler(void *context, dx_message_t *msg, int link_id) { IoAdapter *self = (IoAdapter*) context; @@ -454,9 +454,10 @@ static void dx_io_rx_handler(void *conte PyObject *pAP = dx_field_to_py(ap_map); PyObject *pBody = dx_field_to_py(body_map); - PyObject *pArgs = PyTuple_New(2); + PyObject *pArgs = PyTuple_New(3); PyTuple_SetItem(pArgs, 0, pAP); PyTuple_SetItem(pArgs, 1, pBody); + PyTuple_SetItem(pArgs, 2, PyInt_FromLong((long) link_id)); PyObject *pValue = PyObject_CallObject(self->handler_rx_call, pArgs); Py_DECREF(pArgs); @@ -507,10 +508,10 @@ static PyObject* dx_python_send(PyObject field = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, field); dx_compose_start_map(field); - dx_compose_insert_string(field, "qdx.ingress"); + dx_compose_insert_string(field, DX_DA_INGRESS); dx_compose_insert_string(field, dx_router_id(ioa->dx)); - dx_compose_insert_string(field, "qdx.trace"); + dx_compose_insert_string(field, DX_DA_TRACE); dx_compose_start_list(field); dx_compose_insert_string(field, dx_router_id(ioa->dx)); dx_compose_end_list(field); Modified: qpid/trunk/qpid/extras/dispatch/src/router_node.c URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_node.c?rev=1526694&r1=1526693&r2=1526694&view=diff ============================================================================== --- qpid/trunk/qpid/extras/dispatch/src/router_node.c (original) +++ qpid/trunk/qpid/extras/dispatch/src/router_node.c Thu Sep 26 21:14:59 2013 @@ -21,17 +21,18 @@ #include #include #include +#include #include #include "dispatch_private.h" +#include "router_private.h" static char *module = "ROUTER"; static void dx_router_python_setup(dx_router_t *router); static void dx_pyrouter_tick(dx_router_t *router); -static char *router_address = "_local/qdxrouter"; -static char *local_prefix = "_local/"; -//static char *topo_prefix = "_topo/"; +static char *local_prefix = "_local/"; +static char *topo_prefix = "_topo/"; /** * Address Types and Processing: @@ -48,86 +49,82 @@ static char *local_prefix = "_local/"; * M forward handler */ +ALLOC_DEFINE(dx_routed_event_t); +ALLOC_DEFINE(dx_router_link_t); +ALLOC_DEFINE(dx_router_node_t); +ALLOC_DEFINE(dx_router_ref_t); +ALLOC_DEFINE(dx_router_link_ref_t); +ALLOC_DEFINE(dx_address_t); -typedef struct dx_router_link_t dx_router_link_t; -typedef struct dx_router_node_t dx_router_node_t; +static void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link) +{ + dx_router_link_ref_t *ref = new_dx_router_link_ref_t(); + DEQ_ITEM_INIT(ref); + ref->link = link; + link->ref = ref; + DEQ_INSERT_TAIL(*ref_list, ref); +} -typedef enum { - DX_LINK_ENDPOINT, // A link to a connected endpoint - DX_LINK_ROUTER, // A link to a peer router in the same area - DX_LINK_AREA // A link to a peer router in a different area (area boundary) -} dx_link_type_t; - - -typedef struct dx_routed_event_t { - DEQ_LINKS(struct dx_routed_event_t); - dx_delivery_t *delivery; - dx_message_t *message; - bool settle; - uint64_t disposition; -} dx_routed_event_t; -ALLOC_DECLARE(dx_routed_event_t); -ALLOC_DEFINE(dx_routed_event_t); -DEQ_DECLARE(dx_routed_event_t, dx_routed_event_list_t); +static void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link) +{ + if (link->ref) { + DEQ_REMOVE(*ref_list, link->ref); + free_dx_router_link_ref_t(link->ref); + link->ref = 0; + } +} -struct dx_router_link_t { - DEQ_LINKS(dx_router_link_t); - dx_direction_t link_direction; - dx_link_type_t link_type; - dx_address_t *owning_addr; // [ref] Address record that owns this link - dx_link_t *link; // [own] Link pointer - dx_router_link_t *connected_link; // [ref] If this is a link-route, reference the connected link - dx_router_link_t *peer_link; // [ref] If this is a bidirectional link-route, reference the peer link - dx_routed_event_list_t event_fifo; // FIFO of outgoing delivery/link events (no messages) - dx_routed_event_list_t msg_fifo; // FIFO of outgoing message deliveries -}; +/** + * Check an address to see if it no longer has any associated destinations. + * Depending on its policy, the address may be eligible for being closed out + * (i.e. Logging its terminal statistics and freeing its resources). + */ +static void dx_router_check_addr_LH(dx_address_t *addr) +{ + // TODO +} -ALLOC_DECLARE(dx_router_link_t); -ALLOC_DEFINE(dx_router_link_t); -DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t); -struct dx_router_node_t { - DEQ_LINKS(dx_router_node_t); - const char *id; - dx_router_node_t *next_hop; // Next hop node _if_ this is not a neighbor node - dx_router_link_t *peer_link; // Outgoing link _if_ this is a neighbor node - // list of valid origins (pointers to router_node) - (bit masks?) -}; +/** + * Determine whether a terminus has router capability + */ +static int dx_router_terminus_is_router(pn_terminus_t *term) +{ + pn_data_t *cap = pn_terminus_capabilities(term); -ALLOC_DECLARE(dx_router_node_t); -ALLOC_DEFINE(dx_router_node_t); -DEQ_DECLARE(dx_router_node_t, dx_router_node_list_t); + if (cap && pn_data_type(cap) == PN_SYMBOL) { + pn_bytes_t sym = pn_data_get_symbol(cap); + if (sym.size == strlen(DX_CAPABILITY_ROUTER) && + strcmp(sym.start, DX_CAPABILITY_ROUTER) == 0) + return 1; + } + return 0; +} -struct dx_address_t { - dx_router_message_cb handler; // In-Process Consumer - void *handler_context; - dx_router_link_list_t rlinks; // Locally-Connected Consumers - dx_router_node_list_t rnodes; // Remotely-Connected Consumers -}; -ALLOC_DECLARE(dx_address_t); -ALLOC_DEFINE(dx_address_t); +static void dx_router_generate_temp_addr(dx_router_t *router, char *buffer, size_t length) +{ + static const char *table = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+_"; + char discriminator[11]; + long int rnd = random(); + int idx; + for (idx = 0; idx < 10; idx++) + discriminator[idx] = table[(rnd >> (idx * 6)) & 63]; + discriminator[idx] = '\0'; + + snprintf(buffer, length, "amqp:/%s%s/%s/temp.%s", topo_prefix, router->router_area, router->router_id, discriminator); +} -struct dx_router_t { - dx_dispatch_t *dx; - const char *router_area; - const char *router_id; - dx_node_t *node; - dx_router_link_list_t in_links; - dx_router_node_list_t routers; - dx_message_list_t in_fifo; - sys_mutex_t *lock; - dx_timer_t *timer; - hash_t *out_hash; - uint64_t dtag; - PyObject *pyRouter; - PyObject *pyTick; -}; + +static int dx_router_find_mask_bit(dx_link_t *link) +{ + return 0; // TODO +} /** @@ -191,7 +188,7 @@ static int router_writable_link_handler( DEQ_REMOVE_HEAD(to_send); // - // Get a delivery for the send. This will be the current deliver on the link. + // Get a delivery for the send. This will be the current delivery on the link. // tag++; delivery = dx_delivery(link, pn_dtag((char*) &tag, 8)); @@ -259,8 +256,8 @@ static void router_annotate_message(dx_r dx_parsed_field_t *ingress = 0; if (in_da) { - trace = dx_parse_value_by_key(in_da, "qdx.trace"); - ingress = dx_parse_value_by_key(in_da, "qdx.ingress"); + trace = dx_parse_value_by_key(in_da, DX_DA_TRACE); + ingress = dx_parse_value_by_key(in_da, DX_DA_INGRESS); } dx_compose_start_map(out_da); @@ -269,7 +266,7 @@ static void router_annotate_message(dx_r // If there is a trace field, append this router's ID to the trace. // if (trace && dx_parse_is_list(trace)) { - dx_compose_insert_string(out_da, "qdx.trace"); + dx_compose_insert_string(out_da, DX_DA_TRACE); dx_compose_start_list(out_da); uint32_t idx = 0; @@ -289,7 +286,7 @@ static void router_annotate_message(dx_r // If there is no ingress field, annotate the ingress as this router else // keep the original field. // - dx_compose_insert_string(out_da, "qdx.ingress"); + dx_compose_insert_string(out_da, DX_DA_INGRESS); if (ingress && dx_parse_is_scalar(ingress)) { dx_field_iterator_t *iter = dx_parse_raw(ingress); dx_compose_insert_string_iterator(out_da, iter); @@ -380,7 +377,7 @@ static void router_rx_handler(void* cont if (iter) { dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); - hash_retrieve(router->out_hash, iter, (void*) &addr); + hash_retrieve(router->addr_hash, iter, (void*) &addr); dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST); int is_local = dx_field_iterator_prefix(iter, local_prefix); dx_field_iterator_free(iter); @@ -415,33 +412,34 @@ static void router_rx_handler(void* cont // // Forward to all of the local links receiving this address. // - dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks); - while (dest_link) { + dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks); + while (dest_link_ref) { dx_routed_event_t *re = new_dx_routed_event_t(); DEQ_ITEM_INIT(re); re->delivery = 0; re->message = dx_message_copy(msg); re->settle = 0; re->disposition = 0; - DEQ_INSERT_TAIL(dest_link->msg_fifo, re); + DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re); fanout++; if (fanout == 1 && !dx_delivery_settled(delivery)) re->delivery = delivery; - dx_link_activate(dest_link->link); - dest_link = DEQ_NEXT(dest_link); + dx_link_activate(dest_link_ref->link->link); + dest_link_ref = DEQ_NEXT(dest_link_ref); } // // Forward to the next-hops for remote destinations. // - dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes); - while (dest_node) { - if (dest_node->next_hop) - dest_link = dest_node->next_hop->peer_link; + dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes); + dx_router_link_t *dest_link; + while (dest_node_ref) { + if (dest_node_ref->router->next_hop) + dest_link = dest_node_ref->router->next_hop->peer_link; else - dest_link = dest_node->peer_link; + dest_link = dest_node_ref->router->peer_link; if (dest_link) { dx_routed_event_t *re = new_dx_routed_event_t(); DEQ_ITEM_INIT(re); @@ -457,7 +455,7 @@ static void router_rx_handler(void* cont dx_link_activate(dest_link->link); } - dest_node = DEQ_NEXT(dest_node); + dest_node_ref = DEQ_NEXT(dest_node_ref); } } } @@ -487,7 +485,7 @@ static void router_rx_handler(void* cont // Invoke the in-process handler now that the lock is released. // if (handler) - handler(handler_context, in_process_copy); + handler(handler_context, in_process_copy, rlink->mask_bit); } @@ -541,25 +539,28 @@ static int router_incoming_link_handler( dx_router_t *router = (dx_router_t*) context; dx_router_link_t *rlink = new_dx_router_link_t(); pn_link_t *pn_link = dx_link_pn(link); + int is_router = dx_router_terminus_is_router(dx_link_remote_source(link)); DEQ_ITEM_INIT(rlink); + rlink->mask_bit = is_router ? dx_router_find_mask_bit(link) : 0; + rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT; rlink->link_direction = DX_INCOMING; - rlink->link_type = DX_LINK_ENDPOINT; rlink->owning_addr = 0; rlink->link = link; rlink->connected_link = 0; rlink->peer_link = 0; + rlink->ref = 0; DEQ_INIT(rlink->event_fifo); DEQ_INIT(rlink->msg_fifo); dx_link_set_context(link, rlink); sys_mutex_lock(router->lock); - DEQ_INSERT_TAIL(router->in_links, rlink); + DEQ_INSERT_TAIL(router->links, rlink); sys_mutex_unlock(router->lock); - pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); - pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); + pn_terminus_copy(dx_link_source(link), dx_link_remote_source(link)); + pn_terminus_copy(dx_link_target(link), dx_link_remote_target(link)); pn_link_flow(pn_link, 1000); pn_link_open(pn_link); @@ -579,52 +580,90 @@ static int router_outgoing_link_handler( { dx_router_t *router = (dx_router_t*) context; pn_link_t *pn_link = dx_link_pn(link); - const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link)); + const char *r_src = pn_terminus_get_address(dx_link_remote_source(link)); + int is_dynamic = pn_terminus_is_dynamic(dx_link_remote_source(link)); + int is_router = dx_router_terminus_is_router(dx_link_remote_target(link)); - if (!r_tgt) { + // + // If this link is not a router link and it has no source address, we can't + // accept it. + // + if (r_src == 0 && !is_router && !is_dynamic) { pn_link_close(pn_link); return 0; } - dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST); - dx_router_link_t *rlink = new_dx_router_link_t(); - - int is_router = dx_field_iterator_equal(iter, (unsigned char*) router_address); - + // + // Create a router_link record for this link. Some of the fields will be + // modified in the different cases below. + // + dx_router_link_t *rlink = new_dx_router_link_t(); DEQ_ITEM_INIT(rlink); - rlink->link_direction = DX_OUTGOING; + rlink->mask_bit = is_router ? dx_router_find_mask_bit(link) : 0; rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT; + rlink->link_direction = DX_OUTGOING; + rlink->owning_addr = 0; rlink->link = link; rlink->connected_link = 0; rlink->peer_link = 0; + rlink->ref = 0; DEQ_INIT(rlink->event_fifo); DEQ_INIT(rlink->msg_fifo); dx_link_set_context(link, rlink); - - dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); - dx_address_t *addr; + pn_terminus_copy(dx_link_source(link), dx_link_remote_source(link)); + pn_terminus_copy(dx_link_target(link), dx_link_remote_target(link)); sys_mutex_lock(router->lock); - hash_retrieve(router->out_hash, iter, (void**) &addr); - if (!addr) { - addr = new_dx_address_t(); - addr->handler = 0; - addr->handler_context = 0; - DEQ_INIT(addr->rlinks); - DEQ_INIT(addr->rnodes); - hash_insert(router->out_hash, iter, addr); + + if (is_router) { + // + // If this is a router link, put it in the router_address link-list. + // + dx_router_add_link_ref_LH(&router->router_addr->rlinks, rlink); + rlink->owning_addr = router->router_addr; + + } else { + // + // If this is an endpoint link, check the source. If it is dynamic, we will + // assign it an ephemeral and routable address. If it has a non-dymanic + // address, that address needs to be set up in the address list. + // + dx_field_iterator_t *iter; + char temp_addr[1000]; + dx_address_t *addr; + + if (is_dynamic) { + dx_router_generate_temp_addr(router, temp_addr, 1000); + iter = dx_field_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH); + pn_terminus_set_address(dx_link_source(link), temp_addr); + dx_log(module, LOG_INFO, "Assigned temporary routable address: %s", temp_addr); + } else { + iter = dx_field_iterator_string(r_src, ITER_VIEW_ADDRESS_HASH); + dx_log(module, LOG_INFO, "Registered local address: %s", r_src); + } + + hash_retrieve(router->addr_hash, iter, (void**) &addr); + if (!addr) { + addr = new_dx_address_t(); + DEQ_ITEM_INIT(addr); + addr->handler = 0; + addr->handler_context = 0; + DEQ_INIT(addr->rlinks); + DEQ_INIT(addr->rnodes); + hash_insert(router->addr_hash, iter, addr); + DEQ_INSERT_TAIL(router->addrs, addr); + } + dx_field_iterator_free(iter); + + rlink->owning_addr = addr; + dx_router_add_link_ref_LH(&addr->rlinks, rlink); } - dx_field_iterator_free(iter); - rlink->owning_addr = addr; - DEQ_INSERT_TAIL(addr->rlinks, rlink); + DEQ_INSERT_TAIL(router->links, rlink); + sys_mutex_unlock(router->lock); - pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); - pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); pn_link_open(pn_link); - sys_mutex_unlock(router->lock); - dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt); return 0; } @@ -634,40 +673,37 @@ static int router_outgoing_link_handler( */ static int router_link_detach_handler(void* context, dx_link_t *link, int closed) { - dx_router_t *router = (dx_router_t*) context; - pn_link_t *pn_link = dx_link_pn(link); - dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); - const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link)); + dx_router_t *router = (dx_router_t*) context; + dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); if (!rlink) return 0; sys_mutex_lock(router->lock); - if (pn_link_is_sender(pn_link)) { - DEQ_REMOVE(rlink->owning_addr->rlinks, rlink); - if ((rlink->owning_addr->handler == 0) && - (DEQ_SIZE(rlink->owning_addr->rlinks) == 0) && - (DEQ_SIZE(rlink->owning_addr->rnodes) == 0)) { - dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH); - dx_address_t *addr; - if (iter) { - hash_retrieve(router->out_hash, iter, (void**) &addr); - if (addr == rlink->owning_addr) { - hash_remove(router->out_hash, iter); - free_dx_router_link_t(rlink); - free_dx_address_t(addr); - dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt); - } - dx_field_iterator_free(iter); - } - } - } else { - DEQ_REMOVE(router->in_links, rlink); - free_dx_router_link_t(rlink); + // + // If the link is outgoing, we must disassociate it from its address. + // + if (rlink->link_direction == DX_OUTGOING && rlink->owning_addr) { + dx_router_del_link_ref_LH(&rlink->owning_addr->rlinks, rlink); + dx_router_check_addr_LH(rlink->owning_addr); } + // + // If this is an incoming inter-router link, we must free the mask_bit. + // + if (rlink->link_type == DX_LINK_ROUTER && rlink->link_direction == DX_INCOMING) + dx_bitmask_set_bit(router->neighbor_free_mask, rlink->mask_bit); + + // + // Remove the link from the master list-of-links. + // + DEQ_REMOVE(router->links, rlink); sys_mutex_unlock(router->lock); + + // TODO - wrap the free to handle the recursive items + free_dx_router_link_t(rlink); + return 0; } @@ -683,24 +719,37 @@ static void router_outbound_open_handler // Ignore otherwise dx_router_t *router = (dx_router_t*) type_context; - dx_field_iterator_t *aiter = dx_field_iterator_string(router_address, ITER_VIEW_ADDRESS_HASH); dx_link_t *sender; dx_link_t *receiver; dx_router_link_t *rlink; + int mask_bit = 0; + size_t clen = strlen(DX_CAPABILITY_ROUTER); // - // Create an incoming link and put it in the in-links collection. The address - // of the remote source of this link is '_local/qdxrouter'. + // Allocate a mask bit to designate the pair of links connected to the neighbor router // - receiver = dx_link(router->node, conn, DX_INCOMING, "inter-router-rx"); - pn_terminus_set_address(dx_link_remote_source(receiver), router_address); - pn_terminus_set_address(dx_link_target(receiver), router_address); + sys_mutex_lock(router->lock); + if (dx_bitmask_first_set(router->neighbor_free_mask, &mask_bit)) { + dx_bitmask_clear_bit(router->neighbor_free_mask, mask_bit); + } else { + sys_mutex_unlock(router->lock); + dx_log(module, LOG_CRITICAL, "Exceeded maximum inter-router link count"); + return; + } - rlink = new_dx_router_link_t(); + // + // Create an incoming link with router source capability + // + receiver = dx_link(router->node, conn, DX_INCOMING, DX_INTERNODE_LINK_NAME_1); + // TODO - We don't want to have to cast away the constness of the literal string here! + // See PROTON-429 + pn_data_put_symbol(pn_terminus_capabilities(dx_link_target(receiver)), pn_bytes(clen, (char*) DX_CAPABILITY_ROUTER)); + rlink = new_dx_router_link_t(); DEQ_ITEM_INIT(rlink); - rlink->link_direction = DX_INCOMING; + rlink->mask_bit = mask_bit; rlink->link_type = DX_LINK_ROUTER; + rlink->link_direction = DX_INCOMING; rlink->owning_addr = 0; rlink->link = receiver; rlink->connected_link = 0; @@ -709,53 +758,40 @@ static void router_outbound_open_handler DEQ_INIT(rlink->msg_fifo); dx_link_set_context(receiver, rlink); - - sys_mutex_lock(router->lock); - DEQ_INSERT_TAIL(router->in_links, rlink); - sys_mutex_unlock(router->lock); + DEQ_INSERT_TAIL(router->links, rlink); // - // Create an outgoing link with a local source of '_local/qdxrouter' and place - // it in the routing table. + // Create an outgoing link with router target capability // - sender = dx_link(router->node, conn, DX_OUTGOING, "inter-router-tx"); - pn_terminus_set_address(dx_link_remote_target(sender), router_address); - pn_terminus_set_address(dx_link_source(sender), router_address); + sender = dx_link(router->node, conn, DX_OUTGOING, DX_INTERNODE_LINK_NAME_2); + // TODO - We don't want to have to cast away the constness of the literal string here! + // See PROTON-429 + pn_data_put_symbol(pn_terminus_capabilities(dx_link_target(receiver)), pn_bytes(clen, (char *) DX_CAPABILITY_ROUTER)); rlink = new_dx_router_link_t(); - DEQ_ITEM_INIT(rlink); - rlink->link_direction = DX_OUTGOING; + rlink->mask_bit = mask_bit; rlink->link_type = DX_LINK_ROUTER; + rlink->link_direction = DX_OUTGOING; + rlink->owning_addr = router->router_addr; rlink->link = sender; rlink->connected_link = 0; rlink->peer_link = 0; DEQ_INIT(rlink->event_fifo); DEQ_INIT(rlink->msg_fifo); - dx_link_set_context(sender, rlink); - - dx_address_t *addr; - - sys_mutex_lock(router->lock); - hash_retrieve(router->out_hash, aiter, (void**) &addr); - if (!addr) { - addr = new_dx_address_t(); - addr->handler = 0; - addr->handler_context = 0; - DEQ_INIT(addr->rlinks); - DEQ_INIT(addr->rnodes); - hash_insert(router->out_hash, aiter, addr); - } + // + // Add the new outgoing link to the router_address's list of links. + // + dx_router_add_link_ref_LH(&router->router_addr->rlinks, rlink); - rlink->owning_addr = addr; - DEQ_INSERT_TAIL(addr->rlinks, rlink); + dx_link_set_context(sender, rlink); + DEQ_INSERT_TAIL(router->links, rlink); sys_mutex_unlock(router->lock); pn_link_open(dx_link_pn(receiver)); pn_link_open(dx_link_pn(sender)); pn_link_flow(dx_link_pn(receiver), 1000); - dx_field_iterator_free(aiter); } @@ -767,7 +803,6 @@ static void dx_router_timer_handler(void // Periodic processing. // dx_pyrouter_tick(router); - dx_timer_schedule(router->timer, 1000); } @@ -797,20 +832,29 @@ dx_router_t *dx_router(dx_dispatch_t *dx router_node.type_context = router; + dx->router = router; router->dx = dx; router->router_area = area; router->router_id = id; router->node = dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH); - DEQ_INIT(router->in_links); + DEQ_INIT(router->addrs); + router->addr_hash = hash(10, 32, 0); + + DEQ_INIT(router->links); DEQ_INIT(router->routers); - DEQ_INIT(router->in_fifo); - router->lock = sys_mutex(); - router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router); - router->out_hash = hash(10, 32, 0); - router->dtag = 1; - router->pyRouter = 0; - router->pyTick = 0; + router->neighbor_free_mask = dx_bitmask(1); + router->lock = sys_mutex(); + router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router); + router->dtag = 1; + router->pyRouter = 0; + router->pyTick = 0; + + // + // Create an address for all of the routers in the topology. It will be registered + // locally later in the initialization sequence. + // + router->router_addr = dx_router_register_address(dx, "qdxrouter", 0, 0); // // Inform the field iterator module of this router's id and area. The field iterator @@ -824,7 +868,6 @@ dx_router_t *dx_router(dx_dispatch_t *dx dx_python_start(); dx_log(module, LOG_INFO, "Router started, area=%s id=%s", area, id); - return router; } @@ -869,14 +912,17 @@ dx_address_t *dx_router_register_address iter = dx_field_iterator_string(addr_string, ITER_VIEW_NO_HOST); sys_mutex_lock(router->lock); - hash_retrieve(router->out_hash, iter, (void**) &addr); + hash_retrieve(router->addr_hash, iter, (void**) &addr); if (!addr) { addr = new_dx_address_t(); + DEQ_ITEM_INIT(addr); addr->handler = 0; addr->handler_context = 0; DEQ_INIT(addr->rlinks); DEQ_INIT(addr->rnodes); - hash_insert(router->out_hash, iter, addr); + hash_insert(router->addr_hash, iter, addr); + DEQ_ITEM_INIT(addr); + DEQ_INSERT_TAIL(router->addrs, addr); } dx_field_iterator_free(iter); @@ -885,7 +931,8 @@ dx_address_t *dx_router_register_address sys_mutex_unlock(router->lock); - dx_log(module, LOG_TRACE, "In-Process Address Registered: %s", address); + if (handler) + dx_log(module, LOG_INFO, "In-Process Address Registered: %s", address); return addr; } @@ -905,34 +952,35 @@ void dx_router_send(dx_dispatch_t dx_field_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH); sys_mutex_lock(router->lock); - hash_retrieve(router->out_hash, address, (void*) &addr); + hash_retrieve(router->addr_hash, address, (void*) &addr); if (addr) { // // Forward to all of the local links receiving this address. // - dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks); - while (dest_link) { + dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks); + while (dest_link_ref) { dx_routed_event_t *re = new_dx_routed_event_t(); DEQ_ITEM_INIT(re); re->delivery = 0; re->message = dx_message_copy(msg); re->settle = 0; re->disposition = 0; - DEQ_INSERT_TAIL(dest_link->msg_fifo, re); + DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re); - dx_link_activate(dest_link->link); - dest_link = DEQ_NEXT(dest_link); + dx_link_activate(dest_link_ref->link->link); + dest_link_ref = DEQ_NEXT(dest_link_ref); } // // Forward to the next-hops for remote destinations. // - dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes); - while (dest_node) { - if (dest_node->next_hop) - dest_link = dest_node->next_hop->peer_link; + dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes); + dx_router_link_t *dest_link; + while (dest_node_ref) { + if (dest_node_ref->router->next_hop) + dest_link = dest_node_ref->router->next_hop->peer_link; else - dest_link = dest_node->peer_link; + dest_link = dest_node_ref->router->peer_link; if (dest_link) { dx_routed_event_t *re = new_dx_routed_event_t(); DEQ_ITEM_INIT(re); @@ -943,7 +991,7 @@ void dx_router_send(dx_dispatch_t DEQ_INSERT_TAIL(dest_link->msg_fifo, re); dx_link_activate(dest_link->link); } - dest_node = DEQ_NEXT(dest_node); + dest_node_ref = DEQ_NEXT(dest_node_ref); } } sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher? @@ -977,8 +1025,11 @@ static PyObject* dx_router_node_updated( const char *address; int is_reachable; int is_neighbor; + int link_maskbit; + int router_maskbit; - if (!PyArg_ParseTuple(args, "sii", &address, &is_reachable, &is_neighbor)) + if (!PyArg_ParseTuple(args, "siiii", &address, &is_reachable, &is_neighbor, + &link_maskbit, &router_maskbit)) return 0; // TODO @@ -1099,6 +1150,7 @@ static void dx_router_python_setup(dx_ro PyObject* pName; PyObject* pId; PyObject* pArea; + PyObject* pMaxRouters; PyObject* pModule; PyObject* pClass; PyObject* pArgs; @@ -1126,7 +1178,7 @@ static void dx_router_python_setup(dx_ro // // Constructor Arguments for RouterEngine // - pArgs = PyTuple_New(3); + pArgs = PyTuple_New(4); // arg 0: adapter instance PyTuple_SetItem(pArgs, 0, adapterInstance); @@ -1135,10 +1187,14 @@ static void dx_router_python_setup(dx_ro pId = PyString_FromString(router->router_id); PyTuple_SetItem(pArgs, 1, pId); - // arg 2: area id + // arg 2: area_id pArea = PyString_FromString(router->router_area); PyTuple_SetItem(pArgs, 2, pArea); + // arg 3: max_routers + pMaxRouters = PyInt_FromLong((long) dx_bitmask_width()); + PyTuple_SetItem(pArgs, 3, pMaxRouters); + // // Instantiate the router // Added: qpid/trunk/qpid/extras/dispatch/src/router_private.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_private.h?rev=1526694&view=auto ============================================================================== --- qpid/trunk/qpid/extras/dispatch/src/router_private.h (added) +++ qpid/trunk/qpid/extras/dispatch/src/router_private.h Thu Sep 26 21:14:59 2013 @@ -0,0 +1,128 @@ +#ifndef __router_private_h__ +#define __router_private_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +typedef struct dx_router_link_t dx_router_link_t; +typedef struct dx_router_node_t dx_router_node_t; +typedef struct dx_router_ref_t dx_router_ref_t; +typedef struct dx_router_link_ref_t dx_router_link_ref_t; + + +typedef enum { + DX_LINK_ENDPOINT, // A link to a connected endpoint + DX_LINK_ROUTER, // A link to a peer router in the same area + DX_LINK_AREA // A link to a peer router in a different area (area boundary) +} dx_link_type_t; + + +typedef struct dx_routed_event_t { + DEQ_LINKS(struct dx_routed_event_t); + dx_delivery_t *delivery; + dx_message_t *message; + bool settle; + uint64_t disposition; +} dx_routed_event_t; + +ALLOC_DECLARE(dx_routed_event_t); +DEQ_DECLARE(dx_routed_event_t, dx_routed_event_list_t); + + +struct dx_router_link_t { + DEQ_LINKS(dx_router_link_t); + int mask_bit; // Unique mask bit if this is an inter-router link + dx_link_type_t link_type; + dx_direction_t link_direction; + dx_address_t *owning_addr; // [ref] Address record that owns this link + dx_link_t *link; // [own] Link pointer + dx_router_link_t *connected_link; // [ref] If this is a link-route, reference the connected link + dx_router_link_t *peer_link; // [ref] If this is a bidirectional link-route, reference the peer link + dx_router_link_ref_t *ref; // Pointer to a containing reference object + dx_routed_event_list_t event_fifo; // FIFO of outgoing delivery/link events (no messages) + dx_routed_event_list_t msg_fifo; // FIFO of outgoing message deliveries +}; + +ALLOC_DECLARE(dx_router_link_t); +DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t); + +struct dx_router_node_t { + DEQ_LINKS(dx_router_node_t); + const char *id; + int mask_bit; + dx_router_node_t *next_hop; // Next hop node _if_ this is not a neighbor node + dx_router_link_t *peer_link; // Outgoing link _if_ this is a neighbor node + dx_bitmask_t *valid_origins; +}; + +ALLOC_DECLARE(dx_router_node_t); +DEQ_DECLARE(dx_router_node_t, dx_router_node_list_t); + +struct dx_router_ref_t { + DEQ_LINKS(dx_router_ref_t); + dx_router_node_t *router; +}; + +ALLOC_DECLARE(dx_router_ref_t); +DEQ_DECLARE(dx_router_ref_t, dx_router_ref_list_t); + + +struct dx_router_link_ref_t { + DEQ_LINKS(dx_router_link_ref_t); + dx_router_link_t *link; +}; + +ALLOC_DECLARE(dx_router_link_ref_t); +DEQ_DECLARE(dx_router_link_ref_t, dx_router_link_ref_list_t); + + +struct dx_address_t { + DEQ_LINKS(dx_address_t); + dx_router_message_cb handler; // In-Process Consumer + void *handler_context; // In-Process Consumer context + dx_router_link_ref_list_t rlinks; // Locally-Connected Consumers + dx_router_ref_list_t rnodes; // Remotely-Connected Consumers +}; + +ALLOC_DECLARE(dx_address_t); +DEQ_DECLARE(dx_address_t, dx_address_list_t); + + +struct dx_router_t { + dx_dispatch_t *dx; + const char *router_area; + const char *router_id; + dx_node_t *node; + + dx_address_list_t addrs; + hash_t *addr_hash; + dx_address_t *router_addr; + + dx_router_link_list_t links; + dx_router_node_list_t routers; + + dx_bitmask_t *neighbor_free_mask; + sys_mutex_t *lock; + dx_timer_t *timer; + uint64_t dtag; + + PyObject *pyRouter; + PyObject *pyTick; +}; + +#endif Propchange: qpid/trunk/qpid/extras/dispatch/src/router_private.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/trunk/qpid/extras/dispatch/src/router_private.h ------------------------------------------------------------------------------ svn:keywords = Author Date Id Rev URL --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org