From commits-return-49380-archive-asf-public=cust-asf.ponee.io@qpid.apache.org Tue Jan 14 18:57:25 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 8951B180661 for ; Tue, 14 Jan 2020 19:57:24 +0100 (CET) Received: (qmail 96969 invoked by uid 500); 14 Jan 2020 18:57:23 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 96897 invoked by uid 99); 14 Jan 2020 18:57:23 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Jan 2020 18:57:23 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 8523B81F11; Tue, 14 Jan 2020 18:57:23 +0000 (UTC) Date: Tue, 14 Jan 2020 18:57:23 +0000 To: "commits@qpid.apache.org" Subject: [qpid-dispatch] branch master updated: DISPATCH-1532 - Reimplemented mobile-address-synchronization as a core module. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <157902824339.25868.7737757600572511140@gitbox.apache.org> From: kgiusti@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: qpid-dispatch X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 2fcb812e3a7050fe671bb1ca91c7c37e3db56173 X-Git-Newrev: 588f38ec9203f95dffa5dec248388a068a6cfdd3 X-Git-Rev: 588f38ec9203f95dffa5dec248388a068a6cfdd3 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. kgiusti pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/master by this push: new 588f38e DISPATCH-1532 - Reimplemented mobile-address-synchronization as a core module. 588f38e is described below commit 588f38ec9203f95dffa5dec248388a068a6cfdd3 Author: Ted Ross AuthorDate: Fri Dec 6 16:34:03 2019 -0500 DISPATCH-1532 - Reimplemented mobile-address-synchronization as a core module. --- include/qpid/dispatch/router_core.h | 36 +- python/qpid_dispatch_internal/router/engine.py | 50 +- python/qpid_dispatch_internal/router/mobile.py | 175 ---- python/qpid_dispatch_internal/router/node.py | 47 +- src/CMakeLists.txt | 1 + src/parse.c | 4 +- src/python_embedded.c | 3 +- src/router_core/connections.c | 10 +- src/router_core/core_events.c | 47 +- src/router_core/core_events.h | 59 +- src/router_core/delivery.c | 19 +- src/router_core/delivery.h | 2 +- src/router_core/exchange_bindings.c | 11 +- src/router_core/forwarder.c | 32 +- .../modules/address_lookup_client/lookup_client.c | 2 +- .../edge_addr_tracking/edge_addr_tracking.c | 1 + src/router_core/modules/edge_router/addr_proxy.c | 3 +- .../modules/edge_router/connection_manager.c | 1 + src/router_core/modules/edge_router/edge_mgmt.c | 5 +- .../modules/edge_router/link_route_proxy.c | 3 +- src/router_core/modules/mobile_sync/mobile.c | 892 +++++++++++++++++++++ .../modules/test_hooks/core_test_hooks.c | 2 +- src/router_core/route_tables.c | 232 ++---- src/router_core/router_core.c | 34 +- src/router_core/router_core_private.h | 44 +- src/router_pynode.c | 84 +- tests/system_tests_interior_sync_up.py | 2 +- tests/system_tests_link_routes.py | 16 +- tests/system_tests_multicast.py | 5 +- 29 files changed, 1268 insertions(+), 554 deletions(-) diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index bcf5671..5d77004 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -85,18 +85,18 @@ void qdr_core_set_next_hop(qdr_core_t *core, int router_maskbit, int nh_router_m void qdr_core_remove_next_hop(qdr_core_t *core, int router_maskbit); void qdr_core_set_cost(qdr_core_t *core, int router_maskbit, int cost); void qdr_core_set_valid_origins(qdr_core_t *core, int router_maskbit, qd_bitmask_t *routers); -void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address_hash, int treatment_hint); -void qdr_core_unmap_destination(qdr_core_t *core, int router_maskbit, const char *address_hash); +void qdr_core_flush_destinations(qdr_core_t *core, int router_maskbit); +void qdr_core_mobile_seq_advanced(qdr_core_t *core, int router_maskbit); -typedef void (*qdr_mobile_added_t) (void *context, const char *address_hash, qd_address_treatment_t treatment); -typedef void (*qdr_mobile_removed_t) (void *context, const char *address_hash); -typedef void (*qdr_link_lost_t) (void *context, int link_maskbit); +typedef void (*qdr_set_mobile_seq_t) (void *context, int router_maskbit, uint64_t mobile_seq); +typedef void (*qdr_set_my_mobile_seq_t) (void *context, uint64_t mobile_seq); +typedef void (*qdr_link_lost_t) (void *context, int link_maskbit); -void qdr_core_route_table_handlers(qdr_core_t *core, - void *context, - qdr_mobile_added_t mobile_added, - qdr_mobile_removed_t mobile_removed, - qdr_link_lost_t link_lost); +void qdr_core_route_table_handlers(qdr_core_t *core, + void *context, + qdr_set_mobile_seq_t set_mobile_seq, + qdr_set_my_mobile_seq_t set_my_mobile_seq, + qdr_link_lost_t link_lost); /** ****************************************************************************** @@ -106,11 +106,27 @@ void qdr_core_route_table_handlers(qdr_core_t *core, typedef void (*qdr_receive_t) (void *context, qd_message_t *msg, int link_maskbit, int inter_router_cost, uint64_t conn_id); +/** + * qdr_core_subscribe + * + * Subscribe an in-process handler to receive messages to a particular address. + * + * @param core Pointer to the core module + * @param address The address of messages to be received + * @param aclass Address class character + * @param phase Address phase character ('0' .. '9') + * @param treatment Treatment for the address if it be being created as a side effect of this call + * @param in_core True iff the handler is to be run in the context of the core thread + * @param on_message The handler function + * @param context The opaque context sent to the handler on all invocations + * @return Pointer to the subscription object + */ qdr_subscription_t *qdr_core_subscribe(qdr_core_t *core, const char *address, char aclass, char phase, qd_address_treatment_t treatment, + bool in_core, qdr_receive_t on_message, void *context); diff --git a/python/qpid_dispatch_internal/router/engine.py b/python/qpid_dispatch_internal/router/engine.py index 753d743..e564523 100644 --- a/python/qpid_dispatch_internal/router/engine.py +++ b/python/qpid_dispatch_internal/router/engine.py @@ -26,7 +26,6 @@ from .data import MessageHELLO, MessageRA, MessageLSU, MessageMAU, MessageMAR, M from .hello import HelloProtocol from .link import LinkStateEngine from .path import PathEngine -from .mobile import MobileAddressEngine from .node import NodeTracker from .message import Message @@ -56,13 +55,10 @@ class RouterEngine(object): self._config = None # Not yet loaded self._log_hello = LogAdapter("ROUTER_HELLO") self._log_ls = LogAdapter("ROUTER_LS") - self._log_ma = LogAdapter("ROUTER_MA") self._log_general = LogAdapter("ROUTER") - self.io_adapter = [IoAdapter(self.receive, "qdrouter", 'L', '0', TREATMENT_MULTICAST_FLOOD), - IoAdapter(self.receive, "qdrouter.ma", 'L', '0', TREATMENT_MULTICAST_ONCE), - IoAdapter(self.receive, "qdrouter", 'T', '0', TREATMENT_MULTICAST_FLOOD), - IoAdapter(self.receive, "qdrouter.ma", 'T', '0', TREATMENT_MULTICAST_ONCE), - IoAdapter(self.receive, "qdhello", 'L', '0', TREATMENT_MULTICAST_FLOOD)] + self.io_adapter = [IoAdapter(self.receive, "qdrouter", 'L', '0', TREATMENT_MULTICAST_FLOOD), + IoAdapter(self.receive, "qdrouter", 'T', '0', TREATMENT_MULTICAST_FLOOD), + IoAdapter(self.receive, "qdhello", 'L', '0', TREATMENT_MULTICAST_FLOOD)] self.max_routers = max_routers self.id = router_id self.instance = int(time.time()) @@ -77,7 +73,6 @@ class RouterEngine(object): self.hello_protocol = HelloProtocol(self, self.node_tracker) self.link_state_engine = LinkStateEngine(self) self.path_engine = PathEngine(self) - self.mobile_address_engine = MobileAddressEngine(self, self.node_tracker) ##======================================================================================== @@ -98,26 +93,28 @@ class RouterEngine(object): raise ValueError("No router configuration found") return self._config - def addressAdded(self, addr, treatment): + + def setMobileSeq(self, router_maskbit, mobile_seq): """ + Another router's mobile sequence number has been changed and the Python router needs to store + this number. """ - try: - if addr[0] in 'MCDEFH': - self.mobile_address_engine.add_local_address(addr, treatment) - except Exception: - self.log_ma(LOG_ERROR, "Exception in new-address processing\n%s" % format_exc(LOG_STACK_LIMIT)) + self.node_tracker.set_mobile_seq(router_maskbit, mobile_seq) - def addressRemoved(self, addr): + + def setMyMobileSeq(self, mobile_seq): """ + This router's mobile sequence number has been changed and the Python router needs to store + this number and immediately send a router-advertisement message to reflect the change. """ - try: - if addr[0] in 'MCDEFH': - self.mobile_address_engine.del_local_address(addr) - except Exception: - self.log_ma(LOG_ERROR, "Exception in del-address processing\n%s" % format_exc(LOG_STACK_LIMIT)) + self.link_state_engine.set_mobile_seq(mobile_seq) + self.link_state_engine.send_ra(time.time()) + def linkLost(self, link_id): """ + The control-link to a neighbor has been dropped. We can cancel the neighbor from the + link-state immediately instead of waiting for the hello-timeout to expire. """ self.node_tracker.link_lost(link_id) @@ -133,6 +130,7 @@ class RouterEngine(object): except Exception: self.log(LOG_ERROR, "Exception in timer processing\n%s" % format_exc(LOG_STACK_LIMIT)) + def handleControlMessage(self, opcode, body, link_id, cost): """ """ @@ -158,20 +156,11 @@ class RouterEngine(object): self.log_ls(LOG_TRACE, "RCVD: %r" % msg) self.link_state_engine.handle_lsr(msg, now) - elif opcode == 'MAU': - msg = MessageMAU(body) - self.log_ma(LOG_TRACE, "RCVD: %r" % msg) - self.mobile_address_engine.handle_mau(msg, now) - - elif opcode == 'MAR': - msg = MessageMAR(body) - self.log_ma(LOG_TRACE, "RCVD: %r" % msg) - self.mobile_address_engine.handle_mar(msg, now) - except Exception: self.log(LOG_ERROR, "Exception in control message processing\n%s" % format_exc(LOG_STACK_LIMIT)) self.log(LOG_ERROR, "Control message error: opcode=%s body=%r" % (opcode, body)) + def receive(self, message, link_id, cost): """ This is the IoAdapter message-receive handler @@ -183,6 +172,7 @@ class RouterEngine(object): self.log(LOG_ERROR, "Exception in raw message processing: properties=%r body=%r" % (message.properties, message.body)) + def getRouterData(self, kind): """ """ diff --git a/python/qpid_dispatch_internal/router/mobile.py b/python/qpid_dispatch_internal/router/mobile.py deleted file mode 100644 index e6ee48e..0000000 --- a/python/qpid_dispatch_internal/router/mobile.py +++ /dev/null @@ -1,175 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from __future__ import unicode_literals -from __future__ import division -from __future__ import absolute_import -from __future__ import print_function - -from .data import MessageMAR, MessageMAU -from ..dispatch import LOG_TRACE - -MAX_KEPT_DELTAS = 10 - -class MobileAddressEngine(object): - """ - This module is responsible for maintaining an up-to-date list of mobile addresses in the domain. - It runs the Mobile-Address protocol and generates an un-optimized routing table for mobile addresses. - Note that this routing table maps from the mobile address to the remote router where that address - is directly bound. - """ - def __init__(self, container, node_tracker): - self.container = container - self.node_tracker = node_tracker - self.id = self.container.id - self.mobile_seq = 0 - self.local_addrs = set([]) - self.added_addrs = set([]) - self.deleted_addrs = set([]) - self.sent_deltas = {} - self.treatments = {} - - - def tick(self, now): - ## - ## If local addrs have changed, collect the changes and send a MAU with the diffs - ## Note: it is important that the differential-MAU be sent before a RA is sent - ## - if len(self.added_addrs) > 0 or len(self.deleted_addrs) > 0: - self.mobile_seq += 1 - hints = [self.treatments[a] for a in self.added_addrs] - msg = MessageMAU(None, self.id, self.mobile_seq, list(self.added_addrs), list(self.deleted_addrs), _hints=hints) - - self.sent_deltas[self.mobile_seq] = msg - if len(self.sent_deltas) > MAX_KEPT_DELTAS: - self.sent_deltas.pop(self.mobile_seq - MAX_KEPT_DELTAS) - - self.container.send('amqp:/_topo/0/all/qdrouter.ma', msg) - self.container.log_ma(LOG_TRACE, "SENT: %r" % msg) - self.local_addrs.update(self.added_addrs) - self.local_addrs.difference_update(self.deleted_addrs) - self.added_addrs.clear() - self.deleted_addrs.clear() - return self.mobile_seq - - - def add_local_address(self, addr, treatment): - """ - """ - self.treatments[addr] = treatment - if addr not in self.local_addrs: - if addr not in self.added_addrs: - self.added_addrs.add(addr) - else: - if addr in self.deleted_addrs: - self.deleted_addrs.remove(addr) - - - def del_local_address(self, addr): - """ - """ - del self.treatments[addr] - if addr in self.local_addrs: - if addr not in self.deleted_addrs: - self.deleted_addrs.add(addr) - else: - if addr in self.added_addrs: - self.added_addrs.remove(addr) - - - def handle_mau(self, msg, now): - ## - ## If the MAU is differential, we can only use it if its sequence is exactly one greater - ## than our stored sequence. If not, we will ignore the content and schedule a MAR. - ## - ## If the MAU is absolute, we can use it in all cases. - ## - if msg.id == self.id: - return - node = self.node_tracker.router_node(msg.id) - - if msg.exist_list != None: - ## - ## Absolute MAU - ## - if msg.mobile_seq == node.mobile_address_sequence: - return - node.mobile_address_sequence = msg.mobile_seq - node.overwrite_addresses(msg.exist_list) - else: - ## - ## Differential MAU - ## - if node.mobile_address_sequence + 1 == msg.mobile_seq: - ## - ## This message represents the next expected sequence, incorporate the deltas - ## - node.mobile_address_sequence += 1 - treatments = msg.hints or [] - for a in msg.add_list: - if len(treatments): - treatment = treatments.pop(0) - else: - treatment = -1 - node.map_address(a, treatment) - for a in msg.del_list: - node.unmap_address(a) - - elif node.mobile_address_sequence == msg.mobile_seq: - ## - ## Ignore duplicates - ## - return - - else: - ## - ## This is an out-of-sequence delta. Don't use it. Schedule a MAR to - ## get back on track. - ## - node.mobile_address_request() - - - def handle_mar(self, msg, now): - if msg.id == self.id: - return - if msg.have_seq == self.mobile_seq: - return - if self.mobile_seq - (msg.have_seq + 1) < len(self.sent_deltas): - ## - ## We can catch the peer up with a series of stored differential updates - ## - for s in range(msg.have_seq + 1, self.mobile_seq + 1): - self.container.send('amqp:/_topo/0/%s/qdrouter.ma' % msg.id, self.sent_deltas[s]) - self.container.log_ma(LOG_TRACE, "SENT: %r" % self.sent_deltas[s]) - return - - ## - ## The peer needs to be sent an absolute update with the whole address list - ## - smsg = MessageMAU(None, self.id, self.mobile_seq, None, None, list(self.local_addrs)) - self.container.send('amqp:/_topo/0/%s/qdrouter.ma' % msg.id, smsg) - self.container.log_ma(LOG_TRACE, "SENT: %r" % smsg) - - - def send_mar(self, node_id, seq): - msg = MessageMAR(None, self.id, seq) - self.container.send('amqp:/_topo/0/%s/qdrouter.ma' % node_id, msg) - self.container.log_ma(LOG_TRACE, "SENT: %r" % msg) - - diff --git a/python/qpid_dispatch_internal/router/node.py b/python/qpid_dispatch_internal/router/node.py index c99ce95..aed6870 100644 --- a/python/qpid_dispatch_internal/router/node.py +++ b/python/qpid_dispatch_internal/router/node.py @@ -187,14 +187,7 @@ class NodeTracker(object): if node.link_state_requested(): self.container.link_state_engine.send_lsr(node_id) if node.mobile_address_requested(): - self.container.mobile_address_engine.send_mar(node_id, node.mobile_address_sequence) - - ## - ## If local changes have been made to the list of mobile addresses, send - ## an unsolicited mobile-address-update to all routers. - ## - mobile_seq = self.container.mobile_address_engine.tick(now) - self.container.link_state_engine.set_mobile_seq(mobile_seq) + self.container.router_adapter.mobile_seq_advanced(node.maskbit) ## ## Send an immediate RA if our link state changed @@ -260,6 +253,15 @@ class NodeTracker(object): self.link_state_changed = True + def set_mobile_seq(self, router_maskbit, mobile_seq): + """ + """ + for node in self.nodes.values(): + if node.maskbit == router_maskbit: + node.mobile_address_sequence = mobile_seq + return + + def in_flux_mode(self, now): result = (now - self.last_topology_change) <= self.flux_interval if not result and self.flux_mode: @@ -406,7 +408,6 @@ class RouterNode(object): self.next_hop_router = None self.cost = None self.valid_origins = None - self.mobile_addresses = set([]) self.mobile_address_sequence = 0 self.need_ls_request = True self.need_mobile_request = False @@ -541,34 +542,10 @@ class RouterNode(object): return False - def map_address(self, addr, treatment = -1): - self.mobile_addresses.add(addr) - self.adapter.map_destination(addr, treatment, self.maskbit) - self.log(LOG_DEBUG, "Remote destination %s mapped to router %s" % (self._logify(addr), self.id)) - - - def unmap_address(self, addr): - self.mobile_addresses.remove(addr) - self.adapter.unmap_destination(addr, self.maskbit) - self.log(LOG_DEBUG, "Remote destination %s unmapped from router %s" % (self._logify(addr), self.id)) - - def unmap_all_addresses(self): self.mobile_address_sequence = 0 - for addr in self.mobile_addresses: - self.adapter.unmap_destination(addr, self.maskbit) - self.log(LOG_DEBUG, "Remote destination %s unmapped from router %s" % (self._logify(addr), self.id)) - - def overwrite_addresses(self, addrs_list): - added = [] - deleted = [] - addrs = set(addrs_list) - added = addrs.difference(self.mobile_addresses) - deleted = self.mobile_addresses.difference(addrs) - for a in added: - self.map_address(a) - for a in deleted: - self.unmap_address(a) + self.adapter.flush_destinations(self.maskbit) + self.log(LOG_DEBUG, "Remote destinations flushed from router %s" % (self.id)) def update_instance(self, instance, version): diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d57df77..0a01d43 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -100,6 +100,7 @@ set(qpid_dispatch_SOURCES router_core/modules/address_lookup_server/address_lookup_server.c router_core/modules/address_lookup_client/lookup_client.c router_core/modules/stuck_delivery_detection/delivery_tracker.c + router_core/modules/mobile_sync/mobile.c router_node.c router_pynode.c schema_enum.c diff --git a/src/parse.c b/src/parse.c index 087b8d5..25d303d 100644 --- a/src/parse.c +++ b/src/parse.c @@ -676,7 +676,9 @@ int qd_parse_is_list(qd_parsed_field_t *field) if (!field) return 0; - return field->tag == QD_AMQP_LIST8 || field->tag == QD_AMQP_LIST32; + return field->tag == QD_AMQP_LIST8 + || field->tag == QD_AMQP_LIST32 + || field->tag == QD_AMQP_LIST0; } diff --git a/src/python_embedded.c b/src/python_embedded.c index 154ee00..972a235 100644 --- a/src/python_embedded.c +++ b/src/python_embedded.c @@ -587,7 +587,8 @@ static int IoAdapter_init(IoAdapter *self, PyObject *args, PyObject *kwds) char *address = py_string_2_c(addr); if (!address) return -1; qd_error_clear(); - self->sub = qdr_core_subscribe(self->core, address, aclass, phase, treatment, qd_io_rx_handler, self); + self->sub = qdr_core_subscribe(self->core, address, aclass, phase, treatment, + false, qd_io_rx_handler, self); free(address); if (qd_error_code()) { PyErr_SetString(PyExc_RuntimeError, qd_error_message()); diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 0a710d0..1a44baf 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -1249,7 +1249,6 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr) && DEQ_SIZE(addr->inlinks) == 0 && qd_bitmask_cardinality(addr->rnodes) == 0 && addr->ref_count == 0 - && !addr->block_deletion && addr->tracked_deliveries == 0 && addr->core_endpoint == 0 && addr->fallback_for == 0) { @@ -1767,7 +1766,7 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b return; } - qdr_address_t *addr = link->owning_addr; + qdr_address_t *addr = link->owning_addr; if (link->detach_received) return; @@ -1839,7 +1838,9 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b // // Unbind the address and the link. // + addr->ref_count++; qdr_core_unbind_address_link_CT(core, addr, link); + addr->ref_count--; // // If this is an edge data link, raise a link event to indicate its detachment. @@ -1865,8 +1866,11 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b switch (link->link_type) { case QD_LINK_ENDPOINT: case QD_LINK_EDGE_DOWNLINK: - if (addr) + if (addr) { + addr->ref_count++; qdr_core_unbind_address_link_CT(core, addr, link); + addr->ref_count--; + } break; case QD_LINK_CONTROL: diff --git a/src/router_core/core_events.c b/src/router_core/core_events.c index 34e7b5f..bbdc1c0 100644 --- a/src/router_core/core_events.c +++ b/src/router_core/core_events.c @@ -21,14 +21,16 @@ struct qdrc_event_subscription_t { - DEQ_LINKS_N(CONN, qdrc_event_subscription_t); - DEQ_LINKS_N(LINK, qdrc_event_subscription_t); - DEQ_LINKS_N(ADDR, qdrc_event_subscription_t); + DEQ_LINKS_N(CONN, qdrc_event_subscription_t); + DEQ_LINKS_N(LINK, qdrc_event_subscription_t); + DEQ_LINKS_N(ADDR, qdrc_event_subscription_t); + DEQ_LINKS_N(ROUTER, qdrc_event_subscription_t); void *context; qdrc_event_t events; qdrc_connection_event_t on_conn_event; qdrc_link_event_t on_link_event; qdrc_address_event_t on_addr_event; + qdrc_router_event_t on_router_event; }; @@ -37,21 +39,24 @@ qdrc_event_subscription_t *qdrc_event_subscribe_CT(qdr_core_t *core, qdrc_connection_event_t on_conn_event, qdrc_link_event_t on_link_event, qdrc_address_event_t on_addr_event, + qdrc_router_event_t on_router_event, void *context) { qdrc_event_subscription_t *sub = NEW(qdrc_event_subscription_t); ZERO(sub); - sub->context = context; - sub->events = events; - sub->on_conn_event = on_conn_event; - sub->on_link_event = on_link_event; - sub->on_addr_event = on_addr_event; + sub->context = context; + sub->events = events; + sub->on_conn_event = on_conn_event; + sub->on_link_event = on_link_event; + sub->on_addr_event = on_addr_event; + sub->on_router_event = on_router_event; - assert((events & ~(_QDRC_EVENT_CONN_RANGE | _QDRC_EVENT_LINK_RANGE | _QDRC_EVENT_ADDR_RANGE)) == 0); - assert(!(events & _QDRC_EVENT_CONN_RANGE) || on_conn_event); - assert(!(events & _QDRC_EVENT_LINK_RANGE) || on_link_event); - assert(!(events & _QDRC_EVENT_ADDR_RANGE) || on_addr_event); + assert((events & ~(_QDRC_EVENT_CONN_RANGE | _QDRC_EVENT_LINK_RANGE | _QDRC_EVENT_ADDR_RANGE | _QDRC_EVENT_ROUTER_RANGE)) == 0); + assert(!(events & _QDRC_EVENT_CONN_RANGE) || on_conn_event); + assert(!(events & _QDRC_EVENT_LINK_RANGE) || on_link_event); + assert(!(events & _QDRC_EVENT_ADDR_RANGE) || on_addr_event); + assert(!(events & _QDRC_EVENT_ROUTER_RANGE) || on_router_event); if (events & _QDRC_EVENT_CONN_RANGE) DEQ_INSERT_TAIL_N(CONN, core->conn_event_subscriptions, sub); @@ -62,6 +67,9 @@ qdrc_event_subscription_t *qdrc_event_subscribe_CT(qdr_core_t *core, if (events & _QDRC_EVENT_ADDR_RANGE) DEQ_INSERT_TAIL_N(ADDR, core->addr_event_subscriptions, sub); + if (events & _QDRC_EVENT_ROUTER_RANGE) + DEQ_INSERT_TAIL_N(ROUTER, core->router_event_subscriptions, sub); + return sub; } @@ -77,6 +85,9 @@ void qdrc_event_unsubscribe_CT(qdr_core_t *core, qdrc_event_subscription_t *sub) if (sub->events & _QDRC_EVENT_ADDR_RANGE) DEQ_REMOVE_N(ADDR, core->addr_event_subscriptions, sub); + if (sub->events & _QDRC_EVENT_ROUTER_RANGE) + DEQ_REMOVE_N(ROUTER, core->router_event_subscriptions, sub); + free(sub); } @@ -116,3 +127,15 @@ void qdrc_event_addr_raise(qdr_core_t *core, qdrc_event_t event, qdr_address_t * } } + +void qdrc_event_router_raise(qdr_core_t *core, qdrc_event_t event, qdr_node_t *router) +{ + qdrc_event_subscription_t *sub = DEQ_HEAD(core->router_event_subscriptions); + + while (sub) { + if (sub->events & event) + sub->on_router_event(sub->context, event, router); + sub = DEQ_NEXT_N(ROUTER, sub); + } +} + diff --git a/src/router_core/core_events.h b/src/router_core/core_events.h index 67f2fca..7b351c0 100644 --- a/src/router_core/core_events.h +++ b/src/router_core/core_events.h @@ -60,6 +60,11 @@ typedef uint32_t qdrc_event_t; * QDRC_EVENT_ADDR_NO_LONGER_SOURCE An address transitioned from one to zero local sources (inlink) * QDRC_EVENT_ADDR_TWO_SOURCE An address transitioned from one to two local sources (inlink) * QDRC_EVENT_ADDR_ONE_SOURCE An address transitioned from two to one local sources (inlink) + * + * QDRC_EVENT_ROUTER_ADDED A remote router has been discovered + * QDRC_EVENT_ROUTER_REMOVED A remote router has been lost + * QDRC_EVENT_ROUTER_MOBILE_FLUSH A remote router needs its mobile addresses unmapped + * QDRC_EVENT_ROUTER_MOBILE_SEQ_ADVANCED A remote router's mobile sequence advanced past our version of the sequence */ #define QDRC_EVENT_CONN_OPENED 0x00000001 @@ -70,27 +75,33 @@ typedef uint32_t qdrc_event_t; #define QDRC_EVENT_CONN_IR_LOST 0x00000020 #define _QDRC_EVENT_CONN_RANGE 0x0000003F -#define QDRC_EVENT_LINK_IN_ATTACHED 0x00000100 -#define QDRC_EVENT_LINK_IN_DETACHED 0x00000200 -#define QDRC_EVENT_LINK_OUT_ATTACHED 0x00000400 -#define QDRC_EVENT_LINK_OUT_DETACHED 0x00000800 -#define QDRC_EVENT_LINK_EDGE_DATA_ATTACHED 0x00001000 -#define QDRC_EVENT_LINK_EDGE_DATA_DETACHED 0x00002000 -#define _QDRC_EVENT_LINK_RANGE 0x00003F00 - -#define QDRC_EVENT_ADDR_ADDED 0x00010000 -#define QDRC_EVENT_ADDR_REMOVED 0x00020000 -#define QDRC_EVENT_ADDR_BECAME_LOCAL_DEST 0x00040000 -#define QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST 0x00080000 -#define QDRC_EVENT_ADDR_BECAME_DEST 0x00100000 -#define QDRC_EVENT_ADDR_NO_LONGER_DEST 0x00200000 -#define QDRC_EVENT_ADDR_ONE_LOCAL_DEST 0x00400000 -#define QDRC_EVENT_ADDR_TWO_DEST 0x00800000 -#define QDRC_EVENT_ADDR_BECAME_SOURCE 0x01000000 -#define QDRC_EVENT_ADDR_NO_LONGER_SOURCE 0x02000000 -#define QDRC_EVENT_ADDR_TWO_SOURCE 0x04000000 -#define QDRC_EVENT_ADDR_ONE_SOURCE 0x08000000 -#define _QDRC_EVENT_ADDR_RANGE 0x0FFF0000 +#define QDRC_EVENT_LINK_IN_ATTACHED 0x00000040 +#define QDRC_EVENT_LINK_IN_DETACHED 0x00000080 +#define QDRC_EVENT_LINK_OUT_ATTACHED 0x00000100 +#define QDRC_EVENT_LINK_OUT_DETACHED 0x00000200 +#define QDRC_EVENT_LINK_EDGE_DATA_ATTACHED 0x00000400 +#define QDRC_EVENT_LINK_EDGE_DATA_DETACHED 0x00000800 +#define _QDRC_EVENT_LINK_RANGE 0x00000FC0 + +#define QDRC_EVENT_ADDR_ADDED 0x00001000 +#define QDRC_EVENT_ADDR_REMOVED 0x00002000 +#define QDRC_EVENT_ADDR_BECAME_LOCAL_DEST 0x00004000 +#define QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST 0x00008000 +#define QDRC_EVENT_ADDR_BECAME_DEST 0x00010000 +#define QDRC_EVENT_ADDR_NO_LONGER_DEST 0x00020000 +#define QDRC_EVENT_ADDR_ONE_LOCAL_DEST 0x00040000 +#define QDRC_EVENT_ADDR_TWO_DEST 0x00080000 +#define QDRC_EVENT_ADDR_BECAME_SOURCE 0x00100000 +#define QDRC_EVENT_ADDR_NO_LONGER_SOURCE 0x00200000 +#define QDRC_EVENT_ADDR_TWO_SOURCE 0x00400000 +#define QDRC_EVENT_ADDR_ONE_SOURCE 0x00800000 +#define _QDRC_EVENT_ADDR_RANGE 0x00FFF000 + +#define QDRC_EVENT_ROUTER_ADDED 0x01000000 +#define QDRC_EVENT_ROUTER_REMOVED 0x02000000 +#define QDRC_EVENT_ROUTER_MOBILE_FLUSH 0x04000000 +#define QDRC_EVENT_ROUTER_MOBILE_SEQ_ADVANCED 0x08000000 +#define _QDRC_EVENT_ROUTER_RANGE 0x0F000000 /** @@ -112,6 +123,10 @@ typedef void (*qdrc_address_event_t) (void *context, qdrc_event_t event_type, qdr_address_t *addr); +typedef void (*qdrc_router_event_t) (void *context, + qdrc_event_t event_type, + qdr_node_t *router); + /** * qdrc_event_subscribe_CT * @@ -130,6 +145,7 @@ qdrc_event_subscription_t *qdrc_event_subscribe_CT(qdr_core_t *core, qdrc_connection_event_t on_conn_event, qdrc_link_event_t on_link_event, qdrc_address_event_t on_addr_event, + qdrc_router_event_t on_router_event, void *context); /** @@ -152,5 +168,6 @@ DEQ_DECLARE(qdrc_event_subscription_t, qdrc_event_subscription_list_t); void qdrc_event_conn_raise(qdr_core_t *core, qdrc_event_t event, qdr_connection_t *conn); void qdrc_event_link_raise(qdr_core_t *core, qdrc_event_t event, qdr_link_t *link); void qdrc_event_addr_raise(qdr_core_t *core, qdrc_event_t event, qdr_address_t *addr); +void qdrc_event_router_raise(qdr_core_t *core, qdrc_event_t event, qdr_node_t *router); #endif diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c index 6d98673..7471716 100644 --- a/src/router_core/delivery.c +++ b/src/router_core/delivery.c @@ -448,6 +448,15 @@ static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *de qdr_delivery_increment_counters_CT(core, delivery); // + // Remove any subscription references + // + qdr_subscription_ref_t *sub = DEQ_HEAD(delivery->subscriptions); + while (sub) { + qdr_del_subscription_ref_CT(&delivery->subscriptions, sub); + sub = DEQ_HEAD(delivery->subscriptions); + } + + // // Free all the peer qdr_delivery_ref_t references // qdr_delivery_ref_t *ref = DEQ_HEAD(delivery->peers); @@ -1063,11 +1072,11 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool // The entire message has now been received. Check to see if there are in process subscriptions that need to // receive this message. in process subscriptions, at this time, can deal only with full messages. // - qdr_subscription_t *sub = DEQ_HEAD(in_dlv->subscriptions); - while (sub) { - DEQ_REMOVE_HEAD(in_dlv->subscriptions); - qdr_forward_on_message_CT(core, sub, link, in_dlv->msg); - sub = DEQ_HEAD(in_dlv->subscriptions); + qdr_subscription_ref_t *subref = DEQ_HEAD(in_dlv->subscriptions); + while (subref) { + qdr_forward_on_message_CT(core, subref->sub, link, in_dlv->msg); + qdr_del_subscription_ref_CT(&in_dlv->subscriptions, subref); + subref = DEQ_HEAD(in_dlv->subscriptions); } // This is a presettled multi-frame unicast delivery. diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h index 3e93970..ac29739 100644 --- a/src/router_core/delivery.h +++ b/src/router_core/delivery.h @@ -59,7 +59,7 @@ struct qdr_delivery_t { int tracking_addr_bit; int ingress_index; qdr_link_work_t *link_work; ///< Delivery work item for this delivery - qdr_subscription_list_t subscriptions; + qdr_subscription_ref_list_t subscriptions; qdr_delivery_ref_list_t peers; /// Use this list if there if the delivery has more than one peer. bool multicast; /// True if this delivery is targeted for a multicast address. bool via_edge; /// True if this delivery arrived via an edge-connection. diff --git a/src/router_core/exchange_bindings.c b/src/router_core/exchange_bindings.c index dc3b495..5bf1bbb 100644 --- a/src/router_core/exchange_bindings.c +++ b/src/router_core/exchange_bindings.c @@ -928,9 +928,11 @@ static qdr_exchange_t *qdr_exchange(qdr_core_t *core, ex->alternate = next_hop(ex, alternate, alt_phase); } - qdr_post_mobile_added_CT(core, - (const char*) qd_hash_key_by_handle(ex->qdr_addr->hash_handle), - ex->qdr_addr->treatment); + // + // TODO - handle case where there was already a local dest. + // + qdr_addr_start_inlinks_CT(ex->core, ex->qdr_addr); + qdrc_event_addr_raise(ex->core, QDRC_EVENT_ADDR_BECAME_LOCAL_DEST, ex->qdr_addr); } return ex; @@ -939,8 +941,7 @@ static qdr_exchange_t *qdr_exchange(qdr_core_t *core, static void qdr_exchange_free(qdr_exchange_t *ex) { if (ex->core->running && DEQ_SIZE(ex->qdr_addr->rlinks) == 0) { - qdr_post_mobile_removed_CT(ex->core, - (const char*) qd_hash_key_by_handle(ex->qdr_addr->hash_handle)); + qdrc_event_addr_raise(ex->core, QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST, ex->qdr_addr); } DEQ_REMOVE(ex->core->exchanges, ex); diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index ab73501..52e479a 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -269,14 +269,28 @@ void qdr_forward_on_message(qdr_core_t *core, qdr_general_work_t *work) void qdr_forward_on_message_CT(qdr_core_t *core, qdr_subscription_t *sub, qdr_link_t *link, qd_message_t *msg) { - qdr_general_work_t *work = qdr_general_work(qdr_forward_on_message); - work->on_message = sub->on_message; - work->on_message_context = sub->on_message_context; - work->msg = qd_message_copy(msg); - work->maskbit = link ? link->conn->mask_bit : 0; - work->inter_router_cost = link ? link->conn->inter_router_cost : 1; - work->in_conn_id = link ? link->conn->identity : 0; - qdr_post_general_work_CT(core, work); + int mask_bit = link ? link->conn->mask_bit : 0; + int cost = link ? link->conn->inter_router_cost : 1; + uint64_t identity = link ? link->conn->identity : 0; + + if (sub->in_core) { + // + // The handler runs in-core. Invoke it right now. + // + sub->on_message(sub->on_message_context, msg, mask_bit, cost, identity); + } else { + // + // The handler runs in an IO thread. Defer its invocation. + // + qdr_general_work_t *work = qdr_general_work(qdr_forward_on_message); + work->on_message = sub->on_message; + work->on_message_context = sub->on_message_context; + work->msg = qd_message_copy(msg); + work->maskbit = mask_bit; + work->inter_router_cost = cost; + work->in_conn_id = identity; + qdr_post_general_work_CT(core, work); + } } @@ -327,7 +341,7 @@ static void qdr_forward_to_subscriber(qdr_core_t *core, qdr_subscription_t *sub, // after the message fully arrives // assert(in_dlv); - DEQ_INSERT_TAIL(in_dlv->subscriptions, sub); + qdr_add_subscription_ref_CT(&in_dlv->subscriptions, sub); qd_message_Q2_holdoff_disable(in_msg); } } diff --git a/src/router_core/modules/address_lookup_client/lookup_client.c b/src/router_core/modules/address_lookup_client/lookup_client.c index ba4ffd8..aa6f968 100644 --- a/src/router_core/modules/address_lookup_client/lookup_client.c +++ b/src/router_core/modules/address_lookup_client/lookup_client.c @@ -774,7 +774,7 @@ static void qcm_addr_lookup_client_init_CT(qdr_core_t *core, void **module_conte client->core = core; client->event_sub = qdrc_event_subscribe_CT(client->core, QDRC_EVENT_CONN_EDGE_ESTABLISHED | QDRC_EVENT_CONN_EDGE_LOST, - on_conn_event, 0, 0, + on_conn_event, 0, 0, 0, client); core->addr_lookup_handler = qcm_addr_lookup_CT; diff --git a/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c b/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c index c89a9cf..2fa7bff 100644 --- a/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c +++ b/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c @@ -407,6 +407,7 @@ static void qdrc_edge_address_tracking_init_CT(qdr_core_t *core, void **module_c 0, on_link_event, on_addr_event, + 0, context); } diff --git a/src/router_core/modules/edge_router/addr_proxy.c b/src/router_core/modules/edge_router/addr_proxy.c index 22998be..af6e5c2 100644 --- a/src/router_core/modules/edge_router/addr_proxy.c +++ b/src/router_core/modules/edge_router/addr_proxy.c @@ -526,7 +526,8 @@ qcm_edge_addr_proxy_t *qcm_edge_addr_proxy(qdr_core_t *core) on_conn_event, 0, on_addr_event, - ap); + 0, + ap); core->edge_conn_addr = qcm_edge_conn_addr; core->edge_context = ap; diff --git a/src/router_core/modules/edge_router/connection_manager.c b/src/router_core/modules/edge_router/connection_manager.c index bb4ae1c..76cfd6c 100644 --- a/src/router_core/modules/edge_router/connection_manager.c +++ b/src/router_core/modules/edge_router/connection_manager.c @@ -98,6 +98,7 @@ qcm_edge_conn_mgr_t *qcm_edge_conn_mgr(qdr_core_t *core) on_conn_event, 0, 0, + 0, cm); cm->active_edge_connection = 0; diff --git a/src/router_core/modules/edge_router/edge_mgmt.c b/src/router_core/modules/edge_router/edge_mgmt.c index beee1c5..84cb9a2 100644 --- a/src/router_core/modules/edge_router/edge_mgmt.c +++ b/src/router_core/modules/edge_router/edge_mgmt.c @@ -303,8 +303,9 @@ void qcm_edge_mgmt_init_CT(qdr_core_t *core) (QDRC_EVENT_CONN_EDGE_ESTABLISHED | QDRC_EVENT_CONN_EDGE_LOST), _conn_event_CT, - NULL, // link event - NULL, // addr event + 0, // link event + 0, // addr event + 0, // router event core); // context } diff --git a/src/router_core/modules/edge_router/link_route_proxy.c b/src/router_core/modules/edge_router/link_route_proxy.c index 0e8cb7e..d21d23d 100644 --- a/src/router_core/modules/edge_router/link_route_proxy.c +++ b/src/router_core/modules/edge_router/link_route_proxy.c @@ -462,8 +462,9 @@ void qcm_edge_link_route_init_CT(qdr_core_t *core) | QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST | QDRC_EVENT_ADDR_BECAME_LOCAL_DEST), _on_conn_event, - NULL, + 0, _on_addr_event, + 0, core); } diff --git a/src/router_core/modules/mobile_sync/mobile.c b/src/router_core/modules/mobile_sync/mobile.c new file mode 100644 index 0000000..5055f2f --- /dev/null +++ b/src/router_core/modules/mobile_sync/mobile.c @@ -0,0 +1,892 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "module.h" +#include "router_core_private.h" +#include "core_events.h" +#include "route_control.h" +#include +#include +#include +#include +#include +#include +#include + +#define PROTOCOL_VERSION 1 +static const char *OPCODE = "opcode"; +static const char *MAR = "MAR"; +static const char *MAU = "MAU"; +static const char *ID = "id"; +static const char *PV = "pv"; +static const char *AREA = "area"; +static const char *MOBILE_SEQ = "mobile_seq"; +static const char *HINTS = "hints"; +static const char *ADD = "add"; +static const char *DEL = "del"; +static const char *EXIST = "exist"; +static const char *HAVE_SEQ = "have_seq"; + +// +// Address.sync_mask bit values +// +#define ADDR_SYNC_IN_ADD_LIST 0x00000001 +#define ADDR_SYNC_IN_DEL_LIST 0x00000002 +#define ADDR_SYNC_TO_BE_DELETED 0x00000004 +#define ADDR_SYNC_MOBILE_TRACKING 0x00000008 + +#define BIT_SET(M,B) M |= B +#define BIT_CLEAR(M,B) M &= ~B +#define BIT_IS_SET(M,B) (M & B) + +typedef struct { + qdr_core_t *core; + qdrc_event_subscription_t *event_sub; + qdr_core_timer_t *timer; + qdr_subscription_t *message_sub1; + qdr_subscription_t *message_sub2; + qd_log_source_t *log; + uint64_t mobile_seq; + qdr_address_list_t added_addrs; + qdr_address_list_t deleted_addrs; +} qdrm_mobile_sync_t; + + +//================================================================================ +// Helper Functions +//================================================================================ + +static qd_address_treatment_t qcm_mobile_sync_default_treatment(qdr_core_t *core, int hint) { + switch (hint) { + case QD_TREATMENT_MULTICAST_FLOOD: + return QD_TREATMENT_MULTICAST_FLOOD; + case QD_TREATMENT_MULTICAST_ONCE: + return QD_TREATMENT_MULTICAST_ONCE; + case QD_TREATMENT_ANYCAST_CLOSEST: + return QD_TREATMENT_ANYCAST_CLOSEST; + case QD_TREATMENT_ANYCAST_BALANCED: + return QD_TREATMENT_ANYCAST_BALANCED; + case QD_TREATMENT_LINK_BALANCED: + return QD_TREATMENT_LINK_BALANCED; + case QD_TREATMENT_UNAVAILABLE: + return QD_TREATMENT_UNAVAILABLE; + default: + return core->qd->default_treatment == QD_TREATMENT_UNAVAILABLE ? QD_TREATMENT_ANYCAST_BALANCED : core->qd->default_treatment; + } +} + + +static bool qcm_mobile_sync_addr_is_mobile(qdr_address_t *addr) +{ + const char *hash_key = (const char*) qd_hash_key_by_handle(addr->hash_handle); + return !!strchr("MCDEFH", hash_key[0]); +} + + +qdr_node_t *qdc_mobile_sync_router_by_id(qdrm_mobile_sync_t *msync, qd_parsed_field_t *id_field) +{ + qd_iterator_t *id_iter = qd_parse_raw(id_field); + qdr_node_t *router = DEQ_HEAD(msync->core->routers); + while (!!router) { + if (qd_iterator_equal(id_iter, qd_hash_key_by_handle(router->owning_addr->hash_handle) + 1)) + return router; + router = DEQ_NEXT(router); + } + + return 0; +} + + +/** + * Bump the ref_count on the address to ensure it is not deleted out from under our attention. + */ +static void qcm_mobile_sync_start_tracking(qdr_address_t *addr) +{ + BIT_SET(addr->sync_mask, ADDR_SYNC_MOBILE_TRACKING); + addr->ref_count++; +} + + +/** + * Decrement the address's ref_count. + * Check the address to have it deleted if it is no longer referenced anywhere. + */ +static void qcm_mobile_sync_stop_tracking(qdr_core_t *core, qdr_address_t *addr) +{ + BIT_CLEAR(addr->sync_mask, ADDR_SYNC_MOBILE_TRACKING); + if (--addr->ref_count == 0) + qdr_check_addr_CT(core, addr); +} + + +static qd_composed_field_t *qcm_mobile_sync_message_headers(const char *address, const char *opcode) +{ + qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0); + qd_compose_start_list(field); + qd_compose_insert_bool(field, 0); // durable + qd_compose_end_list(field); + + field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field); + qd_compose_start_list(field); + qd_compose_insert_null(field); // message-id + qd_compose_insert_null(field); // user-id + qd_compose_insert_string(field, address); // to + qd_compose_end_list(field); + + field = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, field); + qd_compose_start_map(field); + qd_compose_insert_symbol(field, OPCODE); + qd_compose_insert_string(field, opcode); + qd_compose_end_map(field); + + return field; +} + + +static void qcm_mobile_sync_compose_diff_addr_list(qdrm_mobile_sync_t *msync, qd_composed_field_t *field, bool is_added) +{ + qdr_address_list_t *list = is_added ? &msync->added_addrs : &msync->deleted_addrs; + + qd_compose_start_list(field); + qdr_address_t *addr = DEQ_HEAD(*list); + while (addr) { + const char *hash_key = (const char*) qd_hash_key_by_handle(addr->hash_handle); + qd_compose_insert_string(field, hash_key); + if (is_added) { + DEQ_REMOVE_HEAD_N(SYNC_ADD, *list); + BIT_CLEAR(addr->sync_mask, ADDR_SYNC_IN_ADD_LIST); + } else { + DEQ_REMOVE_HEAD_N(SYNC_DEL, *list); + BIT_CLEAR(addr->sync_mask, ADDR_SYNC_IN_DEL_LIST); + qcm_mobile_sync_stop_tracking(msync->core, addr); + } + addr = DEQ_HEAD(*list); + } + qd_compose_end_list(field); +} + + +static void qcm_mobile_sync_compose_diff_hint_list(qdrm_mobile_sync_t *msync, qd_composed_field_t *field) +{ + qd_compose_start_list(field); + qdr_address_t *addr = DEQ_HEAD(msync->added_addrs); + while (addr) { + qd_compose_insert_int(field, addr->treatment); + addr = DEQ_NEXT_N(SYNC_ADD, addr); + } + qd_compose_end_list(field); +} + + +static qd_message_t *qcm_mobile_sync_compose_differential_mau(qdrm_mobile_sync_t *msync, const char *address) +{ + qd_message_t *msg = qd_message(); + qd_composed_field_t *headers = qcm_mobile_sync_message_headers(address, MAU); + qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); + + // + // Add the ingress and trace annotations to the message to prevent this multicast from bouncing + // back to us. + // + qd_composed_field_t *ingress = qd_compose_subfield(0); + qd_compose_insert_string(ingress, qd_router_id(msync->core->qd)); + + qd_composed_field_t *trace = qd_compose_subfield(0); + qd_compose_start_list(trace); + qd_compose_insert_string(trace, qd_router_id(msync->core->qd)); + qd_compose_end_list(trace); + + qd_message_set_ingress_annotation(msg, ingress); + qd_message_set_trace_annotation(msg, trace); + + // + // Generate the message body + // + qd_compose_start_map(body); + qd_compose_insert_symbol(body, ID); + qd_compose_insert_string(body, msync->core->router_id); + + qd_compose_insert_symbol(body, PV); + qd_compose_insert_long(body, PROTOCOL_VERSION); + + qd_compose_insert_symbol(body, AREA); + qd_compose_insert_string(body, msync->core->router_area); + + qd_compose_insert_symbol(body, MOBILE_SEQ); + qd_compose_insert_long(body, msync->mobile_seq); + + qd_compose_insert_symbol(body, HINTS); + qcm_mobile_sync_compose_diff_hint_list(msync, body); + + qd_compose_insert_symbol(body, ADD); + qcm_mobile_sync_compose_diff_addr_list(msync, body, true); + + qd_compose_insert_symbol(body, DEL); + qcm_mobile_sync_compose_diff_addr_list(msync, body, false); + + qd_compose_end_map(body); + + qd_message_compose_3(msg, headers, body); + qd_compose_free(headers); + qd_compose_free(body); + return msg; +} + + +static qd_message_t *qcm_mobile_sync_compose_absolute_mau(qdrm_mobile_sync_t *msync, const char *address) +{ + qd_message_t *msg = qd_message(); + qd_composed_field_t *headers = qcm_mobile_sync_message_headers(address, MAU); + qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); + + qd_compose_start_map(body); + qd_compose_insert_symbol(body, ID); + qd_compose_insert_string(body, msync->core->router_id); + + qd_compose_insert_symbol(body, PV); + qd_compose_insert_long(body, PROTOCOL_VERSION); + + qd_compose_insert_symbol(body, AREA); + qd_compose_insert_string(body, msync->core->router_area); + + qd_compose_insert_symbol(body, MOBILE_SEQ); + qd_compose_insert_long(body, msync->mobile_seq); + + qd_compose_insert_symbol(body, EXIST); + qd_compose_start_list(body); + qdr_address_t *addr = DEQ_HEAD(msync->core->addrs); + while (!!addr) { + // + // For an address to be included in the list, it must: + // - be a mobile address + // - have at least one local consumer, link-route destination, or exchange + // _OR_ be in the delete list (because the peers haven't heard of its pending deletion) + // - not be in the add list (because the peers haven't heard of its pending addition) + // + // Note that in the two add/del list cases, we are reporting information that is not currently + // accurate. In these cases, a differentiao MAU will be sent very shortly that will put the + // peer router in the correct state. + // + if (qcm_mobile_sync_addr_is_mobile(addr) + && ((DEQ_SIZE(addr->rlinks) > 0 || DEQ_SIZE(addr->conns) > 0 || !!addr->exchange) + || BIT_IS_SET(addr->sync_mask, ADDR_SYNC_IN_DEL_LIST)) + && !BIT_IS_SET(addr->sync_mask, ADDR_SYNC_IN_ADD_LIST)) { + const char *hash_key = (const char*) qd_hash_key_by_handle(addr->hash_handle); + qd_compose_insert_string(body, hash_key); + } + addr = DEQ_NEXT(addr); + } + qd_compose_end_list(body); + + qd_compose_insert_symbol(body, HINTS); + qd_compose_start_list(body); + addr = DEQ_HEAD(msync->core->addrs); + while (!!addr) { + // + // This loop uses the same logic as above. + // + if (qcm_mobile_sync_addr_is_mobile(addr) + && ((DEQ_SIZE(addr->rlinks) > 0 || DEQ_SIZE(addr->conns) > 0 || !!addr->exchange) + || BIT_IS_SET(addr->sync_mask, ADDR_SYNC_IN_DEL_LIST)) + && !BIT_IS_SET(addr->sync_mask, ADDR_SYNC_IN_ADD_LIST)) + qd_compose_insert_int(body, addr->treatment); + addr = DEQ_NEXT(addr); + } + qd_compose_end_list(body); + qd_compose_end_map(body); + qd_message_compose_3(msg, headers, body); + qd_compose_free(headers); + qd_compose_free(body); + return msg; +} + + +static qd_message_t *qcm_mobile_sync_compose_mar(qdrm_mobile_sync_t *msync, qdr_node_t *router) +{ + qd_message_t *msg = qd_message(); + qd_composed_field_t *headers = qcm_mobile_sync_message_headers(router->wire_address_ma, MAR); + qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); + + qd_compose_start_map(body); + qd_compose_insert_symbol(body, ID); + qd_compose_insert_string(body, msync->core->router_id); + + qd_compose_insert_symbol(body, PV); + qd_compose_insert_long(body, PROTOCOL_VERSION); + + qd_compose_insert_symbol(body, AREA); + qd_compose_insert_string(body, msync->core->router_area); + + qd_compose_insert_symbol(body, HAVE_SEQ); + qd_compose_insert_long(body, router->mobile_seq); + + qd_compose_end_map(body); + + qd_message_compose_3(msg, headers, body); + qd_compose_free(headers); + qd_compose_free(body); + return msg; +} + + +//================================================================================ +// Timer Handler +//================================================================================ + +static void qcm_mobile_sync_on_timer_CT(qdr_core_t *core, void *context) +{ + qdrm_mobile_sync_t *msync = (qdrm_mobile_sync_t*) context; + + // + // Re-schedule the timer for the next go-around + // + qdr_core_timer_schedule_CT(core, msync->timer, 0); + + // + // Check the add and delete lists. If they are empty, nothing of note occured in the last + // interval. Exit the handler function. + // + size_t added_count = DEQ_SIZE(msync->added_addrs); + size_t deleted_count = DEQ_SIZE(msync->deleted_addrs); + + if (added_count == 0 && deleted_count == 0) + return; + + // + // Bump the mobile sequence number. + // + msync->mobile_seq++; + + // + // Prepare a differential MAU for sending to all the other routers. + // + qd_message_t *mau = qcm_mobile_sync_compose_differential_mau(msync, "_topo/0/all/qdrouter.ma"); + + // + // Multicast the control message. Set the exclude_inprocess and control flags. + // Use the TOPOLOGICAL class address for sending. + // + int fanout = qdr_forward_message_CT(core, core->routerma_addr_T, mau, 0, true, true); + qd_message_free(mau); + + // + // Post the updated mobile sequence number to the Python router. It is important that this be + // done _after_ sending the differential MAU to prevent a storm of un-needed MAR requests from + // the other routers. + // + qdr_post_set_my_mobile_seq_CT(core, msync->mobile_seq); + + // + // Trace log the activity of this sequence update. + // + qd_log(msync->log, QD_LOG_DEBUG, "New mobile sequence: mobile_seq=%"PRIu64", addrs_added=%ld, addrs_deleted=%ld, fanout=%d", + msync->mobile_seq, added_count, deleted_count, fanout); +} + + +//================================================================================ +// Message Handler +//================================================================================ + +static void qcm_mobile_sync_on_mar_CT(qdrm_mobile_sync_t *msync, qd_parsed_field_t *body) +{ + if (!!body && qd_parse_is_map(body)) { + qd_parsed_field_t *id_field = qd_parse_value_by_key(body, ID); + qd_parsed_field_t *have_seq_field = qd_parse_value_by_key(body, HAVE_SEQ); + uint64_t have_seq = qd_parse_as_ulong(have_seq_field); + + qdr_node_t *router = qdc_mobile_sync_router_by_id(msync, id_field); + if (!!router) { + qd_log(msync->log, QD_LOG_DEBUG, "Received MAR from %s, have_seq=%"PRIu64, + (const char*) qd_hash_key_by_handle(router->owning_addr->hash_handle) + 1, have_seq); + + if (have_seq < msync->mobile_seq) { + // + // The requestor's view of our mobile_seq is less than our actual mobile_sync. + // Send them an absolute MAU to get them caught up to the present. + // + qd_message_t *mau = qcm_mobile_sync_compose_absolute_mau(msync, router->wire_address_ma); + (void) qdr_forward_message_CT(msync->core, router->owning_addr, mau, 0, true, true); + qd_message_free(mau); + + // + // Trace log the activity of this sequence update. + // + qd_log(msync->log, QD_LOG_DEBUG, "Sent MAU to requestor: mobile_seq=%"PRIu64, msync->mobile_seq); + } + } else + qd_log(msync->log, QD_LOG_ERROR, "Received MAR from an unknown router"); + } +} + + +static void qcm_mobile_sync_on_mau_CT(qdrm_mobile_sync_t *msync, qd_parsed_field_t *body) +{ + if (!!body && qd_parse_is_map(body)) { + qd_parsed_field_t *id_field = qd_parse_value_by_key(body, ID); + qd_parsed_field_t *mobile_seq_field = qd_parse_value_by_key(body, MOBILE_SEQ); + uint64_t mobile_seq = qd_parse_as_ulong(mobile_seq_field); + + qdr_node_t *router = qdc_mobile_sync_router_by_id(msync, id_field); + if (!!router) { + const char *router_id = (const char*) qd_hash_key_by_handle(router->owning_addr->hash_handle) + 1; + qd_parsed_field_t *add_field = qd_parse_value_by_key(body, ADD); + qd_parsed_field_t *del_field = qd_parse_value_by_key(body, DEL); + qd_parsed_field_t *exist_field = qd_parse_value_by_key(body, EXIST); + qd_parsed_field_t *hints_field = qd_parse_value_by_key(body, HINTS); + uint32_t hints_count = 0; + qdr_address_t *addr; + + // + // Validate the fields and determine what kind of MAU we've received. + // + if (!!hints_field && qd_parse_is_list(hints_field)) + hints_count = qd_parse_sub_count(hints_field); + + // + // Validate the exist, add, and del fields. They must, if they exist, be lists. + // If there is an exist field, there must not be an add or del field. + // If there is no exist field, there must be both an add and a del field. + // + if ((!!exist_field && !qd_parse_is_list(exist_field)) + || (!!add_field && !qd_parse_is_list(add_field)) + || (!!del_field && !qd_parse_is_list(del_field)) + || (!!exist_field && (!!add_field || !!del_field)) + || (!exist_field && (!add_field || !del_field))) { + qd_log(msync->log, QD_LOG_ERROR, "Received malformed MAU from %s", router_id); + return; + } + + // + // Record the new mobile sequence for the remote router. + // + router->mobile_seq = mobile_seq; + + // + // Check the exist/add list size against the hints-list size. If they are not + // exactly equal, ignore the hints. + // + if (!!exist_field) { + if (hints_count != qd_parse_sub_count(exist_field)) + hints_count = 0; + } else { + if (hints_count != qd_parse_sub_count(add_field)) + hints_count = 0; + } + + qd_log(msync->log, QD_LOG_DEBUG, "Received MAU (%s) from %s, mobile_seq=%"PRIu64, + !!exist_field ? "absolute" : "differential", router_id, mobile_seq); + + // + // If this is an absolute MAU, the existing set of addresses for this router must + // be marked as needing deletion, in case they are not mentioned in the existing + // address list. + // + if (!!exist_field) { + addr = DEQ_HEAD(msync->core->addrs); + while (!!addr) { + if (qcm_mobile_sync_addr_is_mobile(addr) && !!qd_bitmask_value(addr->rnodes, router->mask_bit)) + BIT_SET(addr->sync_mask, ADDR_SYNC_TO_BE_DELETED); + addr = DEQ_NEXT(addr); + } + } + + // + // Run through the add/exist list (depending on which we have) and lookup/add the + // addresses, associating them with the sending router. Clear the to-delete bits + // on every address touched. If hints are available, use them for addresses that + // are newly created. + // + qd_parsed_field_t *field = !!exist_field ? exist_field : add_field; + qd_parsed_field_t *addr_field = qd_field_first_child(field); + qd_parsed_field_t *hint_field = !!hints_count ? qd_field_first_child(hints_field) : 0; + while (addr_field) { + qd_iterator_t *iter = qd_parse_raw(addr_field); + qdr_address_t *addr = 0; + int treatment_hint = !!hint_field ? qd_parse_as_int(hint_field) : -1; + + do { + qd_hash_retrieve(msync->core->addr_hash, iter, (void**) &addr); + if (!addr) { + qdr_address_config_t *addr_config; + qd_address_treatment_t treatment = + qdr_treatment_for_address_hash_with_default_CT(msync->core, + iter, + qcm_mobile_sync_default_treatment(msync->core, treatment_hint), + &addr_config); + addr = qdr_address_CT(msync->core, treatment, addr_config); + if (!addr) { + qd_log(msync->log, QD_LOG_CRITICAL, "map_destination: ignored"); + assert(false); + break; + } + qd_hash_insert(msync->core->addr_hash, iter, addr, &addr->hash_handle); + DEQ_ITEM_INIT(addr); + DEQ_INSERT_TAIL(msync->core->addrs, addr); + + // + // if the address is a link route, add the pattern to the wildcard + // address parse tree + // + const char *a_str = (const char*) qd_hash_key_by_handle(addr->hash_handle); + if (QDR_IS_LINK_ROUTE(a_str[0])) { + qdr_link_route_map_pattern_CT(msync->core, iter, addr); + } + } + + BIT_CLEAR(addr->sync_mask, ADDR_SYNC_TO_BE_DELETED); + if (!qd_bitmask_value(addr->rnodes, router->mask_bit)) { + qd_bitmask_set_bit(addr->rnodes, router->mask_bit); + router->ref_count++; + addr->cost_epoch--; + qdr_addr_start_inlinks_CT(msync->core, addr); + + // + // Raise an address event if this is the first destination for the address + // + if (qd_bitmask_cardinality(addr->rnodes) + DEQ_SIZE(addr->rlinks) == 1) + qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_BECAME_DEST, addr); + else if (qd_bitmask_cardinality(addr->rnodes) == 1 && DEQ_SIZE(addr->rlinks) == 1) + qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_TWO_DEST, addr); + } + } while (false); + + addr_field = qd_field_next_child(addr_field); + hint_field = !!hint_field ? qd_field_next_child(hint_field) : 0; + } + + // + // Run through the delete list, if it exists, and disassociate each address from the + // sending router. Check the address to see if it needs to be deleted. + // + if (!!del_field) { + addr_field = qd_field_first_child(del_field); + while (!!addr_field) { + qd_iterator_t *iter = qd_parse_raw(addr_field); + qdr_address_t *addr = 0; + + qd_hash_retrieve(msync->core->addr_hash, iter, (void**) &addr); + if (!!addr) { + if (qd_bitmask_value(addr->rnodes, router->mask_bit)) { + qd_bitmask_clear_bit(addr->rnodes, router->mask_bit); + router->ref_count--; + addr->cost_epoch--; + + // + // Raise an address event if this was the last destination for the address + // + if (qd_bitmask_cardinality(addr->rnodes) + DEQ_SIZE(addr->rlinks) == 0) + qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_NO_LONGER_DEST, addr); + else if (qd_bitmask_cardinality(addr->rnodes) == 0 && DEQ_SIZE(addr->rlinks) == 1) + qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_ONE_LOCAL_DEST, addr); + + qdr_check_addr_CT(msync->core, addr); + } + } + addr_field = qd_field_next_child(addr_field); + } + } + + // + // If this was an absolute MAU, disassociate any addresses remaining with the + // to-delete flag set. + // + if (!!exist_field) { + addr = DEQ_HEAD(msync->core->addrs); + while (!!addr) { + qdr_address_t *next_addr = DEQ_NEXT(addr); + if (qcm_mobile_sync_addr_is_mobile(addr) + && !!qd_bitmask_value(addr->rnodes, router->mask_bit) + && BIT_IS_SET(addr->sync_mask, ADDR_SYNC_TO_BE_DELETED)) { + qd_bitmask_clear_bit(addr->rnodes, router->mask_bit); + router->ref_count--; + addr->cost_epoch--; + + // + // Raise an address event if this was the last destination for the address + // + if (qd_bitmask_cardinality(addr->rnodes) + DEQ_SIZE(addr->rlinks) == 0) + qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_NO_LONGER_DEST, addr); + else if (qd_bitmask_cardinality(addr->rnodes) == 0 && DEQ_SIZE(addr->rlinks) == 1) + qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_ONE_LOCAL_DEST, addr); + + qdr_check_addr_CT(msync->core, addr); + } + addr = next_addr; + } + } + + // + // Tell the python router about the new mobile sequence + // + qdr_post_set_mobile_seq_CT(msync->core, router->mask_bit, mobile_seq); + } else + qd_log(msync->log, QD_LOG_ERROR, "Received MAU from an unknown router"); + } +} + + +static void qcm_mobile_sync_on_message_CT(void *context, + qd_message_t *msg, + int unused_link_maskbit, + int unused_inter_router_cost, + uint64_t unused_conn_id) +{ + qdrm_mobile_sync_t *msync = (qdrm_mobile_sync_t*) context; + qd_iterator_t *ap_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES); + qd_iterator_t *body_iter = qd_message_field_iterator(msg, QD_FIELD_BODY); + qd_parsed_field_t *ap_field = qd_parse(ap_iter); + qd_parsed_field_t *body_field = qd_parse(body_iter); + + if (!!ap_field && qd_parse_is_map(ap_field)) { + qd_parsed_field_t *opcode_field = qd_parse_value_by_key(ap_field, OPCODE); + + if (qd_iterator_equal(qd_parse_raw(opcode_field), (const unsigned char*) MAR)) + qcm_mobile_sync_on_mar_CT(msync, body_field); + + if (qd_iterator_equal(qd_parse_raw(opcode_field), (const unsigned char*) MAU)) + qcm_mobile_sync_on_mau_CT(msync, body_field); + } + + qd_parse_free(ap_field); + qd_iterator_free(ap_iter); + qd_parse_free(body_field); + qd_iterator_free(body_iter); +} + + +//================================================================================ +// Event Handlers +//================================================================================ + +static void qcm_mobile_sync_on_became_local_dest_CT(qdrm_mobile_sync_t *msync, qdr_address_t *addr) +{ + if (!qcm_mobile_sync_addr_is_mobile(addr)) + return; + + qd_log(msync->log, QD_LOG_DEBUG, "Became Local Dest: %s", (const char*) qd_hash_key_by_handle(addr->hash_handle)); + + if (BIT_IS_SET(addr->sync_mask, ADDR_SYNC_IN_ADD_LIST)) + return; + + if (BIT_IS_SET(addr->sync_mask, ADDR_SYNC_IN_DEL_LIST)) { + // + // If the address was deleted since the last update, simply forget that it was deleted. + // + DEQ_REMOVE_N(SYNC_DEL, msync->deleted_addrs, addr); + BIT_CLEAR(addr->sync_mask, ADDR_SYNC_IN_DEL_LIST); + } else { + DEQ_INSERT_TAIL_N(SYNC_ADD, msync->added_addrs, addr); + BIT_SET(addr->sync_mask, ADDR_SYNC_IN_ADD_LIST); + qcm_mobile_sync_start_tracking(addr); + } +} + + +static void qcm_mobile_sync_on_no_longer_local_dest_CT(qdrm_mobile_sync_t *msync, qdr_address_t *addr) +{ + if (!qcm_mobile_sync_addr_is_mobile(addr)) + return; + + qd_log(msync->log, QD_LOG_DEBUG, "No Longer Local Dest: %s", (const char*) qd_hash_key_by_handle(addr->hash_handle)); + + if (BIT_IS_SET(addr->sync_mask, ADDR_SYNC_IN_DEL_LIST)) + return; + + if (BIT_IS_SET(addr->sync_mask, ADDR_SYNC_IN_ADD_LIST)) { + // + // If the address was added since the last update, simply forget that it was added. + // + DEQ_REMOVE_N(SYNC_ADD, msync->added_addrs, addr); + BIT_CLEAR(addr->sync_mask, ADDR_SYNC_IN_ADD_LIST); + qcm_mobile_sync_stop_tracking(msync->core, addr); + } else { + DEQ_INSERT_TAIL_N(SYNC_DEL, msync->deleted_addrs, addr); + BIT_SET(addr->sync_mask, ADDR_SYNC_IN_DEL_LIST); + } +} + + +static void qcm_mobile_sync_on_router_flush_CT(qdrm_mobile_sync_t *msync, qdr_node_t *router) +{ + router->mobile_seq = 0; + qdr_address_t *addr = DEQ_HEAD(msync->core->addrs); + while (!!addr) { + qdr_address_t *next_addr = DEQ_NEXT(addr); + if (qcm_mobile_sync_addr_is_mobile(addr) + && !!qd_bitmask_value(addr->rnodes, router->mask_bit)) { + // + // This is an address mapped to the router. Unmap the address and clean up. + // + qd_bitmask_clear_bit(addr->rnodes, router->mask_bit); + router->ref_count--; + addr->cost_epoch--; + + // + // Raise an address event if this was the last destination for the address + // + if (qd_bitmask_cardinality(addr->rnodes) + DEQ_SIZE(addr->rlinks) == 0) + qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_NO_LONGER_DEST, addr); + else if (qd_bitmask_cardinality(addr->rnodes) == 0 && DEQ_SIZE(addr->rlinks) == 1) + qdrc_event_addr_raise(msync->core, QDRC_EVENT_ADDR_ONE_LOCAL_DEST, addr); + + qdr_check_addr_CT(msync->core, addr); + } + addr = next_addr; + } +} + + +static void qcm_mobile_sync_on_router_advanced_CT(qdrm_mobile_sync_t *msync, qdr_node_t *router) +{ + // + // Prepare a MAR to be sent to the router + // + qd_message_t *mar = qcm_mobile_sync_compose_mar(msync, router); + + // + // Send the control message. Set the exclude_inprocess and control flags. + // + int fanout = qdr_forward_message_CT(msync->core, router->owning_addr, mar, 0, true, true); + qd_message_free(mar); + + // + // Trace log the activity of this sequence update. + // + qd_log(msync->log, QD_LOG_DEBUG, "Send MAR request to router %s, have_seq=%"PRIu64", fanout=%d", + (const char*) qd_hash_key_by_handle(router->owning_addr->hash_handle) + 1, router->mobile_seq, fanout); +} + + +static void qcm_mobile_sync_on_addr_event_CT(void *context, + qdrc_event_t event_type, + qdr_address_t *addr) +{ + qdrm_mobile_sync_t *msync = (qdrm_mobile_sync_t*) context; + + switch (event_type) { + case QDRC_EVENT_ADDR_BECAME_LOCAL_DEST: + qcm_mobile_sync_on_became_local_dest_CT(msync, addr); + break; + + case QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST: + qcm_mobile_sync_on_no_longer_local_dest_CT(msync, addr); + break; + + default: + break; + } +} + + +static void qcm_mobile_sync_on_router_event_CT(void *context, + qdrc_event_t event_type, + qdr_node_t *router) +{ + qdrm_mobile_sync_t *msync = (qdrm_mobile_sync_t*) context; + + switch (event_type) { + case QDRC_EVENT_ROUTER_MOBILE_FLUSH: + qcm_mobile_sync_on_router_flush_CT(msync, router); + break; + + case QDRC_EVENT_ROUTER_MOBILE_SEQ_ADVANCED: + qcm_mobile_sync_on_router_advanced_CT(msync, router); + break; + + default: + break; + } +} + + +//================================================================================ +// Module Handlers +//================================================================================ + +static bool qcm_mobile_sync_enable_CT(qdr_core_t *core) +{ + return core->router_mode == QD_ROUTER_MODE_INTERIOR; +} + + +static void qcm_mobile_sync_init_CT(qdr_core_t *core, void **module_context) +{ + qdrm_mobile_sync_t *msync = NEW(qdrm_mobile_sync_t); + ZERO(msync); + msync->core = core; + + // + // Subscribe to core events: + // + // - ADDR_BECAME_LOCAL_DEST - Indicates a new address needs to tbe sync'ed with other routers + // - ADDR_NO_LONGER_LOCAL_DEST - Indicates an address needs to be un-sync'd with other routers + // - ROUTER_MOBILE_FLUSH - All addresses associated with the router must be unmapped + // - ROUTER_MOBILE_SEQ_ADVANCED - A router has an advanced mobile-seq and needs to be queried + // + msync->event_sub = qdrc_event_subscribe_CT(core, + QDRC_EVENT_ADDR_BECAME_LOCAL_DEST + | QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST + | QDRC_EVENT_ROUTER_MOBILE_FLUSH + | QDRC_EVENT_ROUTER_MOBILE_SEQ_ADVANCED, + 0, + 0, + qcm_mobile_sync_on_addr_event_CT, + qcm_mobile_sync_on_router_event_CT, + msync); + + // + // Create and schedule a one-second recurring timer to drive the sync protocol + // + msync->timer = qdr_core_timer_CT(core, qcm_mobile_sync_on_timer_CT, msync); + qdr_core_timer_schedule_CT(core, msync->timer, 0); + + // + // Subscribe to receive messages sent to the 'qdrouter.ma' addresses + // + msync->message_sub1 = qdr_core_subscribe(core, "qdrouter.ma", 'L', '0', + QD_TREATMENT_MULTICAST_ONCE, true, qcm_mobile_sync_on_message_CT, msync); + msync->message_sub2 = qdr_core_subscribe(core, "qdrouter.ma", 'T', '0', + QD_TREATMENT_MULTICAST_ONCE, true, qcm_mobile_sync_on_message_CT, msync); + + // + // Create a log source for mobile address sync + // + msync->log = qd_log_source("ROUTER_MA"); + + *module_context = msync; +} + + +static void qcm_mobile_sync_final_CT(void *module_context) +{ + qdrm_mobile_sync_t *msync = (qdrm_mobile_sync_t*) module_context; + + qdrc_event_unsubscribe_CT(msync->core, msync->event_sub); + qdr_core_timer_free_CT(msync->core, msync->timer); + + // + // Don't explicitly unsubscribe the addresses, these are already gone at module-final time. + // + + free(msync); +} + + +QDR_CORE_MODULE_DECLARE("mobile_sync", qcm_mobile_sync_enable_CT, qcm_mobile_sync_init_CT, qcm_mobile_sync_final_CT) diff --git a/src/router_core/modules/test_hooks/core_test_hooks.c b/src/router_core/modules/test_hooks/core_test_hooks.c index 86f4095..263b9d9 100644 --- a/src/router_core/modules/test_hooks/core_test_hooks.c +++ b/src/router_core/modules/test_hooks/core_test_hooks.c @@ -649,7 +649,7 @@ static void qdrc_test_client_api_setup(test_module_t *test_module) tc->conn_events = qdrc_event_subscribe_CT(test_module->core, (QDRC_EVENT_CONN_OPENED | QDRC_EVENT_CONN_CLOSED), _on_conn_event, - NULL, NULL, tc); + 0, 0, 0, tc); qd_log(test_module->core->log, QD_LOG_TRACE, "client test registered %p", tc->conn_events); } diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c index 3466732..d6fdb15 100644 --- a/src/router_core/route_tables.c +++ b/src/router_core/route_tables.c @@ -21,18 +21,18 @@ #include "route_control.h" #include -static void qdr_add_router_CT (qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_del_router_CT (qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_set_link_CT (qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_remove_link_CT (qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_set_next_hop_CT (qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_remove_next_hop_CT (qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_set_cost_CT (qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_set_valid_origins_CT (qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_map_destination_CT (qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_unmap_destination_CT (qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_subscribe_CT (qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_unsubscribe_CT (qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_add_router_CT (qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_del_router_CT (qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_set_link_CT (qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_remove_link_CT (qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_set_next_hop_CT (qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_remove_next_hop_CT (qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_set_cost_CT (qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_set_valid_origins_CT (qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_flush_destinations_CT (qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_mobile_seq_advanced_CT (qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_subscribe_CT (qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_unsubscribe_CT (qdr_core_t *core, qdr_action_t *action, bool discard); //================================================================================== @@ -108,34 +108,32 @@ void qdr_core_set_valid_origins(qdr_core_t *core, int router_maskbit, qd_bitmask } -void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address_hash, int treatment_hint) +void qdr_core_flush_destinations(qdr_core_t *core, int router_maskbit) { - qdr_action_t *action = qdr_action(qdr_map_destination_CT, "map_destination"); + qdr_action_t *action = qdr_action(qdr_flush_destinations_CT, "flush_destinations"); action->args.route_table.router_maskbit = router_maskbit; - action->args.route_table.address = qdr_field(address_hash); - action->args.route_table.treatment_hint = treatment_hint; qdr_action_enqueue(core, action); } -void qdr_core_unmap_destination(qdr_core_t *core, int router_maskbit, const char *address_hash) +void qdr_core_mobile_seq_advanced(qdr_core_t *core, int router_maskbit) { - qdr_action_t *action = qdr_action(qdr_unmap_destination_CT, "unmap_destination"); + qdr_action_t *action = qdr_action(qdr_mobile_seq_advanced_CT, "mobile_seq_advanced"); action->args.route_table.router_maskbit = router_maskbit; - action->args.route_table.address = qdr_field(address_hash); qdr_action_enqueue(core, action); } -void qdr_core_route_table_handlers(qdr_core_t *core, - void *context, - qdr_mobile_added_t mobile_added, - qdr_mobile_removed_t mobile_removed, - qdr_link_lost_t link_lost) + +void qdr_core_route_table_handlers(qdr_core_t *core, + void *context, + qdr_set_mobile_seq_t set_mobile_seq, + qdr_set_my_mobile_seq_t set_my_mobile_seq, + qdr_link_lost_t link_lost) { - core->rt_context = context; - core->rt_mobile_added = mobile_added; - core->rt_mobile_removed = mobile_removed; - core->rt_link_lost = link_lost; + core->rt_context = context; + core->rt_set_mobile_seq = set_mobile_seq; + core->rt_set_my_mobile_seq = set_my_mobile_seq; + core->rt_link_lost = link_lost; } @@ -144,6 +142,7 @@ qdr_subscription_t *qdr_core_subscribe(qdr_core_t *core, char aclass, char phase, qd_address_treatment_t treatment, + bool in_core, qdr_receive_t on_message, void *context) { @@ -152,6 +151,7 @@ qdr_subscription_t *qdr_core_subscribe(qdr_core_t *core, sub->addr = 0; sub->on_message = on_message; sub->on_message_context = context; + sub->in_core = in_core; qdr_action_t *action = qdr_action(qdr_subscribe_CT, "subscribe"); action->args.io.address = qdr_field(address); @@ -298,23 +298,26 @@ static void qdr_add_router_CT(qdr_core_t *core, qdr_action_t *action, bool disca } // - // Set the block-deletion flag on this address for the time that it is associated - // with an existing remote router node. + // Bump the address's ref_count for the time that it is associated with an existing remote router node. // - addr->block_deletion = true; + addr->ref_count++; // // Create a router-node record to represent the remote router. // qdr_node_t *rnode = new_qdr_node_t(); - DEQ_ITEM_INIT(rnode); + ZERO(rnode); rnode->owning_addr = addr; rnode->mask_bit = router_maskbit; - rnode->next_hop = 0; rnode->link_mask_bit = -1; - rnode->ref_count = 0; rnode->valid_origins = qd_bitmask(0); - rnode->cost = 0; + + qd_iterator_reset_view(iter, ITER_VIEW_ALL); + int addr_len = qd_iterator_length(iter); + + rnode->wire_address_ma = (char*) malloc(addr_len + 4); + qd_iterator_ncopy(iter, (unsigned char*) rnode->wire_address_ma, addr_len); + strcpy(rnode->wire_address_ma + addr_len, ".ma"); // // Insert at the head of the list because we don't yet know the cost to this @@ -400,7 +403,7 @@ static void qdr_del_router_CT(qdr_core_t *core, qdr_action_t *action, bool disca // // Check the address and free it if there are no other interested parties tracking it // - oaddr->block_deletion = false; + oaddr->ref_count--; qdr_check_addr_CT(core, oaddr); } @@ -558,141 +561,58 @@ static void qdr_set_valid_origins_CT(qdr_core_t *core, qdr_action_t *action, boo qd_bitmask_free(valid_origins); } -static qd_address_treatment_t default_treatment(qdr_core_t *core, int hint) { - switch (hint) { - case QD_TREATMENT_MULTICAST_FLOOD: - return QD_TREATMENT_MULTICAST_FLOOD; - case QD_TREATMENT_MULTICAST_ONCE: - return QD_TREATMENT_MULTICAST_ONCE; - case QD_TREATMENT_ANYCAST_CLOSEST: - return QD_TREATMENT_ANYCAST_CLOSEST; - case QD_TREATMENT_ANYCAST_BALANCED: - return QD_TREATMENT_ANYCAST_BALANCED; - case QD_TREATMENT_LINK_BALANCED: - return QD_TREATMENT_LINK_BALANCED; - case QD_TREATMENT_UNAVAILABLE: - return QD_TREATMENT_UNAVAILABLE; - default: - return core->qd->default_treatment == QD_TREATMENT_UNAVAILABLE ? QD_TREATMENT_ANYCAST_BALANCED : core->qd->default_treatment; - } -} -static void qdr_map_destination_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +static void qdr_flush_destinations_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { - int router_maskbit = action->args.route_table.router_maskbit; - qdr_field_t *address = action->args.route_table.address; - int treatment_hint = action->args.route_table.treatment_hint; - - if (discard) { - qdr_field_free(address); + if (!!discard) return; - } + + int router_maskbit = action->args.route_table.router_maskbit; do { if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) { - qd_log(core->log, QD_LOG_CRITICAL, "map_destination: Router maskbit out of range: %d", router_maskbit); + qd_log(core->log, QD_LOG_CRITICAL, "flush_destinations: Router maskbit out of range: %d", router_maskbit); break; } - if (core->routers_by_mask_bit[router_maskbit] == 0) { - qd_log(core->log, QD_LOG_CRITICAL, "map_destination: Router not found"); + qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit]; + if (rnode == 0) { + qd_log(core->log, QD_LOG_CRITICAL, "flush_destinations: Router not found"); break; } - qd_iterator_t *iter = address->iterator; - qdr_address_t *addr = 0; - - qd_hash_retrieve(core->addr_hash, iter, (void**) &addr); - if (!addr) { - qdr_address_config_t *addr_config; - qd_address_treatment_t treatment = qdr_treatment_for_address_hash_with_default_CT(core, - iter, - default_treatment(core, treatment_hint), - &addr_config); - addr = qdr_address_CT(core, treatment, addr_config); - if (!addr) { - qd_log(core->log, QD_LOG_CRITICAL, "map_destination: ignored"); - break; - } - qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle); - DEQ_ITEM_INIT(addr); - DEQ_INSERT_TAIL(core->addrs, addr); - // if the address is a link route, add the pattern to the wildcard - // address parse tree - { - const char *a_str = (const char *)qd_hash_key_by_handle(addr->hash_handle); - if (QDR_IS_LINK_ROUTE(a_str[0])) { - qdr_link_route_map_pattern_CT(core, iter, addr); - } - } - } - - qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit]; - qd_bitmask_set_bit(addr->rnodes, router_maskbit); - rnode->ref_count++; - addr->cost_epoch--; - qdr_addr_start_inlinks_CT(core, addr); - // - // Raise an address event if this is the first destination for the address + // Raise the event to be picked up by core modules. // - if (qd_bitmask_cardinality(addr->rnodes) + DEQ_SIZE(addr->rlinks) == 1) - qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_BECAME_DEST, addr); - else if (qd_bitmask_cardinality(addr->rnodes) == 1 && DEQ_SIZE(addr->rlinks) == 1) - qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_TWO_DEST, addr); + qdrc_event_router_raise(core, QDRC_EVENT_ROUTER_MOBILE_FLUSH, rnode); } while (false); - - qdr_field_free(address); } -static void qdr_unmap_destination_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +static void qdr_mobile_seq_advanced_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { - int router_maskbit = action->args.route_table.router_maskbit; - qdr_field_t *address = action->args.route_table.address; - - if (discard) { - qdr_field_free(address); + if (!!discard) return; - } + + int router_maskbit = action->args.route_table.router_maskbit; do { if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) { - qd_log(core->log, QD_LOG_CRITICAL, "unmap_destination: Router maskbit out of range: %d", router_maskbit); + qd_log(core->log, QD_LOG_CRITICAL, "seq_advanced: Router maskbit out of range: %d", router_maskbit); break; } - if (core->routers_by_mask_bit[router_maskbit] == 0) { - qd_log(core->log, QD_LOG_CRITICAL, "unmap_destination: Router not found"); - break; - } - - qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit]; - qd_iterator_t *iter = address->iterator; - qdr_address_t *addr = 0; - - qd_hash_retrieve(core->addr_hash, iter, (void**) &addr); - if (!addr) { - qd_log(core->log, QD_LOG_CRITICAL, "unmap_destination: Address not found"); + qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit]; + if (rnode == 0) { + qd_log(core->log, QD_LOG_CRITICAL, "seq_advanced: Router not found"); break; } - - qd_bitmask_clear_bit(addr->rnodes, router_maskbit); - rnode->ref_count--; - addr->cost_epoch--; // - // Raise an address event if this was the last destination for the address + // Raise the event to be picked up by core modules. // - if (qd_bitmask_cardinality(addr->rnodes) + DEQ_SIZE(addr->rlinks) == 0) - qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_NO_LONGER_DEST, addr); - else if (qd_bitmask_cardinality(addr->rnodes) == 0 && DEQ_SIZE(addr->rlinks) == 1) - qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_ONE_LOCAL_DEST, addr); - - qdr_check_addr_CT(core, addr); + qdrc_event_router_raise(core, QDRC_EVENT_ROUTER_MOBILE_SEQ_ADVANCED, rnode); } while (false); - - qdr_field_free(address); } @@ -754,27 +674,15 @@ static void qdr_unsubscribe_CT(qdr_core_t *core, qdr_action_t *action, bool disc // Call-back Functions //================================================================================== -static void qdr_do_mobile_added(qdr_core_t *core, qdr_general_work_t *work) +static void qdr_do_set_mobile_seq(qdr_core_t *core, qdr_general_work_t *work) { - char *address_hash = qdr_field_copy(work->field); - if (address_hash) { - core->rt_mobile_added(core->rt_context, address_hash, work->treatment); - free(address_hash); - } - - qdr_field_free(work->field); + core->rt_set_mobile_seq(core->rt_context, work->maskbit, work->mobile_seq); } -static void qdr_do_mobile_removed(qdr_core_t *core, qdr_general_work_t *work) +static void qdr_do_set_my_mobile_seq(qdr_core_t *core, qdr_general_work_t *work) { - char *address_hash = qdr_field_copy(work->field); - if (address_hash) { - core->rt_mobile_removed(core->rt_context, address_hash); - free(address_hash); - } - - qdr_field_free(work->field); + core->rt_set_my_mobile_seq(core->rt_context, work->mobile_seq); } @@ -784,19 +692,19 @@ static void qdr_do_link_lost(qdr_core_t *core, qdr_general_work_t *work) } -void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash, qd_address_treatment_t treatment) +void qdr_post_set_mobile_seq_CT(qdr_core_t *core, int router_maskbit, uint64_t mobile_seq) { - qdr_general_work_t *work = qdr_general_work(qdr_do_mobile_added); - work->field = qdr_field(address_hash); - work->treatment = treatment; + qdr_general_work_t *work = qdr_general_work(qdr_do_set_mobile_seq); + work->mobile_seq = mobile_seq; + work->maskbit = router_maskbit; qdr_post_general_work_CT(core, work); } -void qdr_post_mobile_removed_CT(qdr_core_t *core, const char *address_hash) +void qdr_post_set_my_mobile_seq_CT(qdr_core_t *core, uint64_t mobile_seq) { - qdr_general_work_t *work = qdr_general_work(qdr_do_mobile_removed); - work->field = qdr_field(address_hash); + qdr_general_work_t *work = qdr_general_work(qdr_do_set_my_mobile_seq); + work->mobile_seq = mobile_seq; qdr_post_general_work_CT(core, work); } diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index 3ecea9c..bdbfb6e 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -37,6 +37,7 @@ ALLOC_DEFINE(qdr_general_work_t); ALLOC_DEFINE(qdr_link_work_t); ALLOC_DEFINE(qdr_connection_ref_t); ALLOC_DEFINE(qdr_connection_info_t); +ALLOC_DEFINE(qdr_subscription_ref_t); static void qdr_general_handler(void *context); @@ -86,10 +87,10 @@ qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, // Perform outside-of-thread setup for the management agent // core->agent_subscription_mobile = qdr_core_subscribe(core, "$management", 'M', '0', - QD_TREATMENT_ANYCAST_CLOSEST, + QD_TREATMENT_ANYCAST_CLOSEST, false, qdr_management_agent_on_message, core); core->agent_subscription_local = qdr_core_subscribe(core, "$management", 'L', '0', - QD_TREATMENT_ANYCAST_CLOSEST, + QD_TREATMENT_ANYCAST_CLOSEST, false, qdr_management_agent_on_message, core); return core; @@ -220,6 +221,7 @@ void qdr_router_node_free(qdr_core_t *core, qdr_node_t *rnode) DEQ_REMOVE(core->routers, rnode); core->routers_by_mask_bit[rnode->mask_bit] = 0; core->cost_epoch++; + free(rnode->wire_address_ma); free_qdr_node_t(rnode); } @@ -376,7 +378,7 @@ qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, char aclass, const cha if (addr) { qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle); DEQ_INSERT_TAIL(core->addrs, addr); - addr->block_deletion = true; + addr->ref_count++; addr->local = (aclass == 'L'); } } @@ -553,8 +555,6 @@ void qdr_core_bind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_li if (link->link_direction == QD_OUTGOING) { qdr_add_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS); if (DEQ_SIZE(addr->rlinks) == 1) { - if (key && (*key == QD_ITER_HASH_PREFIX_EDGE_SUMMARY || *key == QD_ITER_HASH_PREFIX_MOBILE)) - qdr_post_mobile_added_CT(core, key, addr->treatment); qdr_addr_start_inlinks_CT(core, addr); qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_BECAME_LOCAL_DEST, addr); } else if (DEQ_SIZE(addr->rlinks) == 2 && qd_bitmask_cardinality(addr->rnodes) == 0) @@ -581,9 +581,6 @@ void qdr_core_unbind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_ if (link->link_direction == QD_OUTGOING) { qdr_del_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS); if (DEQ_SIZE(addr->rlinks) == 0) { - const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle); - if (key && (*key == QD_ITER_HASH_PREFIX_MOBILE || *key == QD_ITER_HASH_PREFIX_EDGE_SUMMARY)) - qdr_post_mobile_removed_CT(core, key); qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST, addr); } else if (DEQ_SIZE(addr->rlinks) == 1 && qd_bitmask_cardinality(addr->rnodes) == 0) qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_ONE_LOCAL_DEST, addr); @@ -608,8 +605,6 @@ void qdr_core_bind_address_conn_CT(qdr_core_t *core, qdr_address_t *addr, qdr_co { qdr_add_connection_ref(&addr->conns, conn); if (DEQ_SIZE(addr->conns) == 1) { - const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle); - qdr_post_mobile_added_CT(core, key, addr->treatment); qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_BECAME_LOCAL_DEST, addr); } } @@ -619,8 +614,6 @@ void qdr_core_unbind_address_conn_CT(qdr_core_t *core, qdr_address_t *addr, qdr_ { qdr_del_connection_ref(&addr->conns, conn); if (DEQ_IS_EMPTY(addr->conns)) { - const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle); - qdr_post_mobile_removed_CT(core, key); qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST, addr); } } @@ -691,6 +684,7 @@ void qdr_core_remove_address_config(qdr_core_t *core, qdr_address_config_t *addr qd_iterator_free(pattern); } + void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls) { if (link->ref[cls] != 0) @@ -763,6 +757,22 @@ void qdr_del_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_ref_t *ref } +void qdr_add_subscription_ref_CT(qdr_subscription_ref_list_t *list, qdr_subscription_t *sub) +{ + qdr_subscription_ref_t *ref = new_qdr_subscription_ref_t(); + DEQ_ITEM_INIT(ref); + ref->sub = sub; + DEQ_INSERT_TAIL(*list, ref); +} + + +void qdr_del_subscription_ref_CT(qdr_subscription_ref_list_t *list, qdr_subscription_ref_t *ref) +{ + DEQ_REMOVE(*list, ref); + free_qdr_subscription_ref_t(ref); +} + + static void qdr_general_handler(void *context) { qdr_core_t *core = (qdr_core_t*) context; diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index cf6d044..614b8d5 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -231,17 +231,16 @@ typedef void (*qdr_general_work_handler_t) (qdr_core_t *core, qdr_general_work_t struct qdr_general_work_t { DEQ_LINKS(qdr_general_work_t); qdr_general_work_handler_t handler; - qdr_field_t *field; int maskbit; int inter_router_cost; qd_message_t *msg; qdr_receive_t on_message; void *on_message_context; uint64_t in_conn_id; - int treatment; + uint64_t mobile_seq; qdr_delivery_cleanup_list_t delivery_cleanup_list; - qdr_global_stats_handler_t stats_handler; - void *context; + qdr_global_stats_handler_t stats_handler; + void *context; }; ALLOC_DECLARE(qdr_general_work_t); @@ -349,6 +348,8 @@ struct qdr_node_t { uint32_t ref_count; qd_bitmask_t *valid_origins; int cost; + uint64_t mobile_seq; + char *wire_address_ma; ///< The address of this router's mobile-sync agent in non-hashed form }; DEQ_DECLARE(qdr_node_t, qdr_node_list_t); @@ -380,10 +381,22 @@ struct qdr_subscription_t { qdr_address_t *addr; qdr_receive_t on_message; void *on_message_context; + bool in_core; }; DEQ_DECLARE(qdr_subscription_t, qdr_subscription_list_t); +typedef struct qdr_subscription_ref_t { + DEQ_LINKS(struct qdr_subscription_ref_t); + qdr_subscription_t *sub; +} qdr_subscription_ref_t; + +ALLOC_DECLARE(qdr_subscription_ref_t); +DEQ_DECLARE(qdr_subscription_ref_t, qdr_subscription_ref_list_t); + +void qdr_add_subscription_ref_CT(qdr_subscription_ref_list_t *list, qdr_subscription_t *sub); +void qdr_del_subscription_ref_CT(qdr_subscription_ref_list_t *list, qdr_subscription_ref_t *ref); + DEQ_DECLARE(qdr_delivery_t, qdr_delivery_list_t); void qdr_add_delivery_ref_CT(qdr_delivery_ref_list_t *list, qdr_delivery_t *dlv); @@ -508,14 +521,20 @@ struct qdr_address_t { qdr_link_t *edge_outlink; ///< [ref] Out-link to connected Interior router (on edge router) qd_address_treatment_t treatment; qdr_forwarder_t *forwarder; - int ref_count; ///< Number of link-routes + auto-links referencing this address - bool block_deletion; + int ref_count; ///< Number of entities referencing this address bool local; bool router_control_only; ///< If set, address is only for deliveries arriving on a control link uint32_t tracked_deliveries; uint64_t cost_epoch; // + // State for mobile-address synchronization + // + DEQ_LINKS_N(SYNC_ADD, qdr_address_t); + DEQ_LINKS_N(SYNC_DEL, qdr_address_t); + uint32_t sync_mask; + + // // State for tracking fallback destinations for undeliverable deliveries // qdr_address_t *fallback; ///< Pointer to this address's fallback destination @@ -793,10 +812,10 @@ struct qdr_core_t { // // Route table section // - void *rt_context; - qdr_mobile_added_t rt_mobile_added; - qdr_mobile_removed_t rt_mobile_removed; - qdr_link_lost_t rt_link_lost; + void *rt_context; + qdr_set_mobile_seq_t rt_set_mobile_seq; + qdr_set_my_mobile_seq_t rt_set_my_mobile_seq; + qdr_link_lost_t rt_link_lost; // // Connection section @@ -821,6 +840,7 @@ struct qdr_core_t { qdrc_event_subscription_list_t conn_event_subscriptions; qdrc_event_subscription_list_t link_event_subscriptions; qdrc_event_subscription_list_t addr_event_subscriptions; + qdrc_event_subscription_list_t router_event_subscriptions; qd_router_mode_t router_mode; const char *router_area; @@ -918,8 +938,8 @@ void qdr_forward_on_message_CT(qdr_core_t *core, qdr_subscription_t *sub, qdr_li void qdr_in_process_send_to_CT(qdr_core_t *core, qd_iterator_t *address, qd_message_t *msg, bool exclude_inprocess, bool control); void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query); -void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash, qd_address_treatment_t treatment); -void qdr_post_mobile_removed_CT(qdr_core_t *core, const char *address_hash); +void qdr_post_set_mobile_seq_CT(qdr_core_t *core, int router_maskbit, uint64_t mobile_seq); +void qdr_post_set_my_mobile_seq_CT(qdr_core_t *core, uint64_t mobile_seq); void qdr_post_link_lost_CT(qdr_core_t *core, int link_maskbit); void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work); diff --git a/src/router_pynode.c b/src/router_pynode.c index 1d0c8d5..200e297 100644 --- a/src/router_pynode.c +++ b/src/router_pynode.c @@ -28,12 +28,12 @@ #include "entity_cache.h" #include "python_private.h" -static qd_log_source_t *log_source = 0; -static PyObject *pyRouter = 0; -static PyObject *pyTick = 0; -static PyObject *pyAdded = 0; -static PyObject *pyRemoved = 0; -static PyObject *pyLinkLost = 0; +static qd_log_source_t *log_source = 0; +static PyObject *pyRouter = 0; +static PyObject *pyTick = 0; +static PyObject *pySetMobileSeq = 0; +static PyObject *pySetMyMobileSeq = 0; +static PyObject *pyLinkLost = 0; typedef struct { PyObject_HEAD @@ -237,15 +237,13 @@ static PyObject* qd_set_radius(PyObject *self, PyObject *args) } -static PyObject* qd_map_destination(PyObject *self, PyObject *args) +static PyObject* qd_flush_destinations(PyObject *self, PyObject *args) { RouterAdapter *adapter = (RouterAdapter*) self; qd_router_t *router = adapter->router; - const char *addr_string; - int treatment; int maskbit; - if (!PyArg_ParseTuple(args, "sii", &addr_string, &treatment, &maskbit)) + if (!PyArg_ParseTuple(args, "i", &maskbit)) return 0; if (maskbit >= qd_bitmask_width() || maskbit < 0) { @@ -253,21 +251,20 @@ static PyObject* qd_map_destination(PyObject *self, PyObject *args) return 0; } - qdr_core_map_destination(router->router_core, maskbit, addr_string, treatment); + qdr_core_flush_destinations(router->router_core, maskbit); Py_INCREF(Py_None); return Py_None; } -static PyObject* qd_unmap_destination(PyObject *self, PyObject *args) +static PyObject* qd_mobile_seq_advanced(PyObject *self, PyObject *args) { RouterAdapter *adapter = (RouterAdapter*) self; qd_router_t *router = adapter->router; - const char *addr_string; int maskbit; - if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit)) + if (!PyArg_ParseTuple(args, "i", &maskbit)) return 0; if (maskbit >= qd_bitmask_width() || maskbit < 0) { @@ -275,12 +272,13 @@ static PyObject* qd_unmap_destination(PyObject *self, PyObject *args) return 0; } - qdr_core_unmap_destination(router->router_core, maskbit, addr_string); + qdr_core_mobile_seq_advanced(router->router_core, maskbit); Py_INCREF(Py_None); return Py_None; } + static PyObject* qd_get_agent(PyObject *self, PyObject *args) { RouterAdapter *adapter = (RouterAdapter*) self; PyObject *agent = adapter->router->qd->agent; @@ -292,18 +290,18 @@ static PyObject* qd_get_agent(PyObject *self, PyObject *args) { } static PyMethodDef RouterAdapter_methods[] = { - {"add_router", qd_add_router, METH_VARARGS, "A new remote/reachable router has been discovered"}, - {"del_router", qd_del_router, METH_VARARGS, "We've lost reachability to a remote router"}, - {"set_link", qd_set_link, METH_VARARGS, "Set the link for a neighbor router"}, - {"remove_link", qd_remove_link, METH_VARARGS, "Remove the link for a neighbor router"}, - {"set_next_hop", qd_set_next_hop, METH_VARARGS, "Set the next hop for a remote router"}, - {"remove_next_hop", qd_remove_next_hop, METH_VARARGS, "Remove the next hop for a remote router"}, - {"set_cost", qd_set_cost, METH_VARARGS, "Set the cost to reach a remote router"}, - {"set_valid_origins", qd_set_valid_origins, METH_VARARGS, "Set the valid origins for a remote router"}, - {"set_radius", qd_set_radius, METH_VARARGS, "Set the current topology radius"}, - {"map_destination", qd_map_destination, METH_VARARGS, "Add a newly discovered destination mapping"}, - {"unmap_destination", qd_unmap_destination, METH_VARARGS, "Delete a destination mapping"}, - {"get_agent", qd_get_agent, METH_VARARGS, "Get the management agent"}, + {"add_router", qd_add_router, METH_VARARGS, "A new remote/reachable router has been discovered"}, + {"del_router", qd_del_router, METH_VARARGS, "We've lost reachability to a remote router"}, + {"set_link", qd_set_link, METH_VARARGS, "Set the link for a neighbor router"}, + {"remove_link", qd_remove_link, METH_VARARGS, "Remove the link for a neighbor router"}, + {"set_next_hop", qd_set_next_hop, METH_VARARGS, "Set the next hop for a remote router"}, + {"remove_next_hop", qd_remove_next_hop, METH_VARARGS, "Remove the next hop for a remote router"}, + {"set_cost", qd_set_cost, METH_VARARGS, "Set the cost to reach a remote router"}, + {"set_valid_origins", qd_set_valid_origins, METH_VARARGS, "Set the valid origins for a remote router"}, + {"set_radius", qd_set_radius, METH_VARARGS, "Set the current topology radius"}, + {"flush_destinations", qd_flush_destinations, METH_VARARGS, "Remove all mapped destinations from a router"}, + {"mobile_seq_advanced", qd_mobile_seq_advanced, METH_VARARGS, "Mobile sequence for a router moved ahead of the local value"}, + {"get_agent", qd_get_agent, METH_VARARGS, "Get the management agent"}, {0, 0, 0, 0} }; @@ -317,18 +315,18 @@ static PyTypeObject RouterAdapterType = { }; -static void qd_router_mobile_added(void *context, const char *address_hash, qd_address_treatment_t treatment) +static void qd_router_set_mobile_seq(void *context, int router_mask_bit, uint64_t mobile_seq) { qd_router_t *router = (qd_router_t*) context; PyObject *pArgs; PyObject *pValue; - if (pyAdded && router->router_mode == QD_ROUTER_MODE_INTERIOR) { + if (pySetMobileSeq && router->router_mode == QD_ROUTER_MODE_INTERIOR) { qd_python_lock_state_t lock_state = qd_python_lock(); pArgs = PyTuple_New(2); - PyTuple_SetItem(pArgs, 0, PyUnicode_FromString(address_hash)); - PyTuple_SetItem(pArgs, 1, PyLong_FromLong((long) treatment)); - pValue = PyObject_CallObject(pyAdded, pArgs); + PyTuple_SetItem(pArgs, 0, PyLong_FromLong((long) router_mask_bit)); + PyTuple_SetItem(pArgs, 1, PyLong_FromLong((long) mobile_seq)); + pValue = PyObject_CallObject(pySetMobileSeq, pArgs); qd_error_py(); Py_DECREF(pArgs); Py_XDECREF(pValue); @@ -337,17 +335,17 @@ static void qd_router_mobile_added(void *context, const char *address_hash, qd_a } -static void qd_router_mobile_removed(void *context, const char *address_hash) +static void qd_router_set_my_mobile_seq(void *context, uint64_t mobile_seq) { qd_router_t *router = (qd_router_t*) context; PyObject *pArgs; PyObject *pValue; - if (pyRemoved && router->router_mode == QD_ROUTER_MODE_INTERIOR) { + if (pySetMobileSeq && router->router_mode == QD_ROUTER_MODE_INTERIOR) { qd_python_lock_state_t lock_state = qd_python_lock(); pArgs = PyTuple_New(1); - PyTuple_SetItem(pArgs, 0, PyUnicode_FromString(address_hash)); - pValue = PyObject_CallObject(pyRemoved, pArgs); + PyTuple_SetItem(pArgs, 0, PyLong_FromLong((long) mobile_seq)); + pValue = PyObject_CallObject(pySetMyMobileSeq, pArgs); qd_error_py(); Py_DECREF(pArgs); Py_XDECREF(pValue); @@ -362,7 +360,7 @@ static void qd_router_link_lost(void *context, int link_mask_bit) PyObject *pArgs; PyObject *pValue; - if (pyRemoved && router->router_mode == QD_ROUTER_MODE_INTERIOR) { + if (pyLinkLost && router->router_mode == QD_ROUTER_MODE_INTERIOR) { qd_python_lock_state_t lock_state = qd_python_lock(); pArgs = PyTuple_New(1); PyTuple_SetItem(pArgs, 0, PyLong_FromLong((long) link_mask_bit)); @@ -382,8 +380,8 @@ qd_error_t qd_router_python_setup(qd_router_t *router) qdr_core_route_table_handlers(router->router_core, router, - qd_router_mobile_added, - qd_router_mobile_removed, + qd_router_set_mobile_seq, + qd_router_set_my_mobile_seq, qd_router_link_lost); // @@ -450,10 +448,10 @@ qd_error_t qd_router_python_setup(qd_router_t *router) Py_DECREF(adapterType); QD_ERROR_PY_RET(); - pyTick = PyObject_GetAttrString(pyRouter, "handleTimerTick"); QD_ERROR_PY_RET(); - pyAdded = PyObject_GetAttrString(pyRouter, "addressAdded"); QD_ERROR_PY_RET(); - pyRemoved = PyObject_GetAttrString(pyRouter, "addressRemoved"); QD_ERROR_PY_RET(); - pyLinkLost = PyObject_GetAttrString(pyRouter, "linkLost"); QD_ERROR_PY_RET(); + pyTick = PyObject_GetAttrString(pyRouter, "handleTimerTick"); QD_ERROR_PY_RET(); + pySetMobileSeq = PyObject_GetAttrString(pyRouter, "setMobileSeq"); QD_ERROR_PY_RET(); + pySetMyMobileSeq = PyObject_GetAttrString(pyRouter, "setMyMobileSeq"); QD_ERROR_PY_RET(); + pyLinkLost = PyObject_GetAttrString(pyRouter, "linkLost"); QD_ERROR_PY_RET(); return qd_error_code(); } diff --git a/tests/system_tests_interior_sync_up.py b/tests/system_tests_interior_sync_up.py index 2b220f0..71e7f9d 100644 --- a/tests/system_tests_interior_sync_up.py +++ b/tests/system_tests_interior_sync_up.py @@ -129,7 +129,7 @@ class InteriorSyncUpTest(MessagingHandler): self.timer = None self.poll_timer = None self.delay_timer = None - self.count = 200 + self.count = 2000 self.delay_count = 12 # This should be larger than MAX_KEPT_DELTAS in mobile.py self.inter_router_port = inter_router_port diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py index 4d3f61b..5730340 100644 --- a/tests/system_tests_link_routes.py +++ b/tests/system_tests_link_routes.py @@ -352,12 +352,12 @@ class LinkRouteTest(TestCase): blocking_connection = BlockingConnection(addr) # Receive on org.apache.dev - blocking_receiver = blocking_connection.create_receiver(address="org.apache.dev") + blocking_receiver = blocking_connection.create_receiver(address="org.apache.dev.1") apply_options = AtMostOnce() # Sender to to org.apache.dev - blocking_sender = blocking_connection.create_sender(address="org.apache.dev", options=apply_options) + blocking_sender = blocking_connection.create_sender(address="org.apache.dev.1", options=apply_options) msg = Message(body=hello_world_2) # Send a message blocking_sender.send(msg) @@ -371,10 +371,10 @@ class LinkRouteTest(TestCase): # Make sure that the router node acting as the broker (QDR.A) had one message routed through it. This confirms # that the message was link routed self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address', - name='M0org.apache.dev').deliveriesEgress) + name='M0org.apache.dev.1').deliveriesEgress) self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address', - name='M0org.apache.dev').deliveriesIngress) + name='M0org.apache.dev.1').deliveriesIngress) blocking_connection.close() @@ -563,12 +563,12 @@ class LinkRouteTest(TestCase): blocking_connection = BlockingConnection(addr) # Receive on org.apache - blocking_receiver = blocking_connection.create_receiver(address="org.apache") + blocking_receiver = blocking_connection.create_receiver(address="org.apache.1") apply_options = AtMostOnce() # Sender to to org.apache - blocking_sender = blocking_connection.create_sender(address="org.apache", options=apply_options) + blocking_sender = blocking_connection.create_sender(address="org.apache.1", options=apply_options) msg = Message(body=hello_world_4) # Send a message @@ -583,10 +583,10 @@ class LinkRouteTest(TestCase): # Make sure that the router node acting as the broker (QDR.A) had one message routed through it. This confirms # that the message was link routed self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address', - name='M0org.apache').deliveriesEgress) + name='M0org.apache.1').deliveriesEgress) self.assertEqual(1, local_node.read(type='org.apache.qpid.dispatch.router.address', - name='M0org.apache').deliveriesIngress) + name='M0org.apache.1').deliveriesIngress) blocking_connection.close() diff --git a/tests/system_tests_multicast.py b/tests/system_tests_multicast.py index 1a34cd3..486d75b 100644 --- a/tests/system_tests_multicast.py +++ b/tests/system_tests_multicast.py @@ -558,6 +558,7 @@ class MulticastBase(MessagingHandler): self.r_conns[name] = conn self.create_receiver(event.container, conn, self.topic, name) self.n_receivers += 1 + self.c_received[name] = 0 def on_link_opened(self, event): if event.receiver: @@ -720,7 +721,7 @@ class MulticastPresettled(MulticastBase): class MulticastPresettledRxFail(MulticastPresettled): """ - Spontaineously close a receiver or connection on message received + Spontaneously close a receiver or connection on message received """ def __init__(self, config, count, drop_clients, detach, body): super(MulticastPresettledRxFail, self).__init__(config, count, body, SendPresettled()) @@ -871,7 +872,7 @@ class MulticastUnsettled1Ack(MulticastUnsettled3Ack): class MulticastUnsettledRxFail(MulticastUnsettled3Ack): """ - Spontaineously close a receiver or connection on message received + Spontaneously close a receiver or connection on message received """ def __init__(self, config, count, drop_clients, detach, body): super(MulticastUnsettledRxFail, self).__init__(config, count, body) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org