From commits-return-46643-archive-asf-public=cust-asf.ponee.io@qpid.apache.org Fri Aug 24 21:41:34 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id A7C8D180629 for ; Fri, 24 Aug 2018 21:41:33 +0200 (CEST) Received: (qmail 51581 invoked by uid 500); 24 Aug 2018 19:41:32 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 51572 invoked by uid 99); 24 Aug 2018 19:41:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Aug 2018 19:41:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 59035E0181; Fri, 24 Aug 2018 19:41:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gmurthy@apache.org To: commits@qpid.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: qpid-dispatch git commit: DISPATCH-1103 - Added code to retry failed auto links using the core timeer API. This closes #366. Date: Fri, 24 Aug 2018 19:41:32 +0000 (UTC) Repository: qpid-dispatch Updated Branches: refs/heads/master b652fd408 -> c65fbc9d8 DISPATCH-1103 - Added code to retry failed auto links using the core timeer API. This closes #366. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/c65fbc9d Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/c65fbc9d Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/c65fbc9d Branch: refs/heads/master Commit: c65fbc9d82495348b5d5d0464852655847ebb5db Parents: b652fd4 Author: Ganesh Murthy Authored: Tue Aug 14 11:21:30 2018 -0400 Committer: Ganesh Murthy Committed: Fri Aug 24 15:39:32 2018 -0400 ---------------------------------------------------------------------- src/router_core/connections.c | 8 ++ src/router_core/route_control.c | 64 +++++++++ src/router_core/route_control.h | 17 +++ src/router_core/router_core_private.h | 31 +++-- tests/system_tests_autolinks.py | 211 +++++++++++++++++++++++++++++ 5 files changed, 317 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c65fbc9d/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 482f600..30166b1 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -1332,6 +1332,8 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo while (link_ref) { qdr_link_t *link = link_ref->link; + qdr_route_auto_link_closed_CT(core, link); + // // Clean up the link and all its associated state. // @@ -1842,6 +1844,12 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b link->auto_link->state = QDR_AUTO_LINK_STATE_FAILED; free(link->auto_link->last_error); link->auto_link->last_error = qdr_error_description(error); + + // + // The auto link has failed. Periodically retry setting up the auto link until + // it succeeds. + // + qdr_route_auto_link_detached_CT(core, link); } if (link->link_direction == QD_INCOMING) { http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c65fbc9d/src/router_core/route_control.c ---------------------------------------------------------------------- diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c index 9c08d91..6b5e62e 100644 --- a/src/router_core/route_control.c +++ b/src/router_core/route_control.c @@ -34,6 +34,9 @@ ALLOC_DEFINE(qdr_conn_identifier_t); const char CONTAINER_PREFIX = 'C'; const char CONNECTION_PREFIX = 'L'; +const int AUTO_LINK_FIRST_RETRY_INTERVAL = 2; +const int AUTO_LINK_RETRY_INTERVAL = 5; + static qdr_conn_identifier_t *qdr_route_declare_id_CT(qdr_core_t *core, qd_iterator_t *container, @@ -263,6 +266,22 @@ static void qdr_auto_link_activate_CT(qdr_core_t *core, qdr_auto_link_t *al, qdr } +/** + * Attempts re-establishing auto links across the related connections/containers + */ +static void qdr_route_attempt_auto_link_CT(qdr_core_t *core, + void *context) +{ + qdr_auto_link_t *al = (qdr_auto_link_t *)context; + qdr_connection_ref_t * cref = DEQ_HEAD(al->conn_id->connection_refs); + while (cref) { + qdr_auto_link_activate_CT(core, al, cref->conn); + cref = DEQ_NEXT(cref); + } + +} + + static void qdr_auto_link_deactivate_CT(qdr_core_t *core, qdr_auto_link_t *al, qdr_connection_t *conn) { qdr_route_log_CT(core, "Auto Link Deactivated", al->name, al->identity, conn); @@ -386,6 +405,49 @@ qdr_link_route_t *qdr_route_add_link_route_CT(qdr_core_t *core, } +void qdr_route_auto_link_detached_CT(qdr_core_t *core, qdr_link_t *link) +{ + if (!link->auto_link) + return; + + if (!link->auto_link->retry_timer) + link->auto_link->retry_timer = qdr_core_timer_CT(core, qdr_route_attempt_auto_link_CT, (void *)link->auto_link); + + char *activation_failed = "Auto Link Activation Failed. "; + int error_length = 0; + if (link->auto_link->last_error) + error_length = strlen(link->auto_link->last_error); + int total_length = strlen(activation_failed); + if (error_length) + total_length += error_length; + + char error_msg[total_length]; + memset(error_msg, 0, error_length); + strcat(error_msg, activation_failed); + if (error_length) + strcat(error_msg, link->auto_link->last_error); + + if (link->auto_link->retry_attempts == 0) { + // First retry in 2 seconds + qdr_core_timer_schedule_CT(core, link->auto_link->retry_timer, AUTO_LINK_FIRST_RETRY_INTERVAL); + link->auto_link->retry_attempts += 1; + } + else { + // Successive retries every 5 seconds + qdr_core_timer_schedule_CT(core, link->auto_link->retry_timer, AUTO_LINK_RETRY_INTERVAL); + } + + qdr_route_log_CT(core, error_msg, link->auto_link->name, link->auto_link->identity, link->conn); +} + + +void qdr_route_auto_link_closed_CT(qdr_core_t *core, qdr_link_t *link) +{ + if (link->auto_link && link->auto_link->retry_timer) + qdr_core_timer_cancel_CT(core, link->auto_link->retry_timer); +} + + void qdr_route_del_link_route_CT(qdr_core_t *core, qdr_link_route_t *lr) { // @@ -469,6 +531,7 @@ qdr_auto_link_t *qdr_route_add_auto_link_CT(qdr_core_t *core, if (container_field || connection_field) { al->conn_id = qdr_route_declare_id_CT(core, qd_parse_raw(container_field), qd_parse_raw(connection_field)); DEQ_INSERT_TAIL_N(REF, al->conn_id->auto_link_refs, al); + qdr_connection_ref_t * cref = DEQ_HEAD(al->conn_id->connection_refs); while (cref) { qdr_auto_link_activate_CT(core, al, cref->conn); @@ -516,6 +579,7 @@ void qdr_route_del_auto_link_CT(qdr_core_t *core, qdr_auto_link_t *al) DEQ_REMOVE(core->auto_links, al); free(al->name); free(al->external_addr); + qdr_core_timer_free_CT(core, al->retry_timer); free_qdr_auto_link_t(al); } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c65fbc9d/src/router_core/route_control.h ---------------------------------------------------------------------- diff --git a/src/router_core/route_control.h b/src/router_core/route_control.h index 291766c..a927948 100644 --- a/src/router_core/route_control.h +++ b/src/router_core/route_control.h @@ -55,4 +55,21 @@ void qdr_route_connection_closed_CT(qdr_core_t *core, qdr_connection_t *conn); void qdr_link_route_map_pattern_CT(qdr_core_t *core, qd_iterator_t *address, qdr_address_t *addr); void qdr_link_route_unmap_pattern_CT(qdr_core_t *core, qd_iterator_t *address); +/** + * Actions to be performed when an auto link detaches. + * Retries to establishe an auto link that is associated with the passed in link. + * Uses the core thread timer API to schedule an auto link retry. + * + * @param core Pointer to the core object returned by qd_core() + * @param link qdr_link_t reference. The attach on this link for an auto link was rejected. + */ +void qdr_route_auto_link_detached_CT(qdr_core_t *core, qdr_link_t *link); + +/** + * Performs actions that need to be taken when an auto link is closed. + * For example, if a timer was setup to reconnect the autolink, it needs to be canceled. + * @param link qdr_link_t reference. + */ +void qdr_route_auto_link_closed_CT(qdr_core_t *core, qdr_link_t *link); + #endif http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c65fbc9d/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 28f6b76..e9d7af4 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -603,6 +603,21 @@ ALLOC_DECLARE(qdr_link_route_t); DEQ_DECLARE(qdr_link_route_t, qdr_link_route_list_t); void qdr_core_delete_link_route(qdr_core_t *core, qdr_link_route_t *lr); +// Core timer related field/data structures +typedef void (*qdr_timer_cb_t)(qdr_core_t *core, void* context); + +typedef struct qdr_core_timer_t { + DEQ_LINKS(struct qdr_core_timer_t); + qdr_timer_cb_t handler; + void *context; + uint32_t delta_time_seconds; + bool scheduled; +} qdr_core_timer_t; + +ALLOC_DECLARE(qdr_core_timer_t); +DEQ_DECLARE(qdr_core_timer_t, qdr_core_timer_list_t); + + typedef enum { QDR_AUTO_LINK_STATE_INACTIVE, QDR_AUTO_LINK_STATE_ATTACHING, @@ -621,10 +636,12 @@ struct qdr_auto_link_t { char *external_addr; const char *internal_addr; int phase; + int retry_attempts; qd_direction_t dir; qdr_conn_identifier_t *conn_id; qdr_link_t *link; qdr_auto_link_state_t state; + qdr_core_timer_t *retry_timer; // If the auto link attach fails or gets disconnected, this timer retries the attach. char *last_error; }; @@ -643,20 +660,6 @@ struct qdr_conn_identifier_t { ALLOC_DECLARE(qdr_conn_identifier_t); DEQ_DECLARE(qdr_exchange_t, qdr_exchange_list_t); -// Core timer related field/data structures -typedef void (*qdr_timer_cb_t)(qdr_core_t *core, void* context); - -typedef struct qdr_core_timer_t { - DEQ_LINKS(struct qdr_core_timer_t); - qdr_timer_cb_t handler; - void *context; - uint32_t delta_time_seconds; - bool scheduled; -} qdr_core_timer_t; - -ALLOC_DECLARE(qdr_core_timer_t); -DEQ_DECLARE(qdr_core_timer_t, qdr_core_timer_list_t); - struct qdr_core_t { qd_dispatch_t *qd; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c65fbc9d/tests/system_tests_autolinks.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_autolinks.py b/tests/system_tests_autolinks.py index 2a7244f..0dd9c10 100644 --- a/tests/system_tests_autolinks.py +++ b/tests/system_tests_autolinks.py @@ -25,6 +25,7 @@ from __future__ import print_function import unittest2 as unittest import json +from threading import Timer from proton import Message from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process from proton.handlers import MessagingHandler @@ -35,6 +36,216 @@ from qpid_dispatch.management.client import Node CONNECTION_PROPERTIES = {u'connection': u'properties', u'int_property': 6451} +class AutoLinkDetachAfterAttachTest(MessagingHandler): + def __init__(self, address, node_addr): + super(AutoLinkDetachAfterAttachTest, self).__init__(prefetch=0) + self.timer = None + self.error = None + self.conn = None + self.address = address + self.n_rx_attach = 0 + self.n_tx_attach = 0 + self.node_addr = node_addr + self.sender = None + self.receiver = None + + def timeout(self): + self.error = "Timeout Expired: n_rx_attach=%d n_tx_attach=%d" % (self.n_rx_attach, self.n_tx_attach) + self.conn.close() + + def on_start(self, event): + self.timer = event.reactor.schedule(TIMEOUT, Timeout(self)) + self.conn = event.container.connect(self.address) + + def on_link_opened(self, event): + if event.sender: + self.sender = event.sender + self.n_tx_attach += 1 + if event.sender.remote_source.address != self.node_addr: + self.error = "Expected sender address '%s', got '%s'" % (self.node_addr, event.sender.remote_source.address) + self.timer.cancel() + self.conn.close() + elif event.receiver: + self.receiver = event.receiver + self.n_rx_attach += 1 + if event.receiver.remote_target.address != self.node_addr: + self.error = "Expected receiver address '%s', got '%s'" % (self.node_addr, event.receiver.remote_target.address) + self.timer.cancel() + self.conn.close() + + if self.n_tx_attach == 1 and self.n_rx_attach == 1: + # we have received 2 attaches from the router on the + # autolink address. Now close the sender and the receiver + # The router will retry establishing the autolinks. + self.sender.close() + self.receiver.close() + + # The router will retry the auto link and the n_tx_attach and + # n_rx_attach will be 2 + if self.n_tx_attach == 2 and self.n_rx_attach == 2: + # This if statement will fail if you comment out the call to + # qdr_route_auto_link_detached_CT(core, link) in + # qdr_link_inbound_detach_CT() (connections.c) + self.conn.close() + self.timer.cancel() + + def run(self): + Container(self).run() + + +class DetachAfterAttachTest(TestCase): + @classmethod + def setUpClass(cls): + super(DetachAfterAttachTest, cls).setUpClass() + name = "test-router" + + config = Qdrouterd.Config([ + + ('router', {'mode': 'standalone', 'id': 'A'}), + ('listener', {'host': '127.0.0.1', 'role': 'normal', + 'port': cls.tester.get_port()}), + + ('listener', {'role': 'route-container', 'name': 'myListener', + 'port': cls.tester.get_port()}), + + ('autoLink', {'addr': 'myListener.1', 'connection': 'myListener', + 'direction': 'in'}), + ('autoLink', {'addr': 'myListener.1', 'connection': 'myListener', + 'direction': 'out'}), + ]) + + cls.router = cls.tester.qdrouterd(name, config) + cls.router.wait_ready() + cls.route_address = cls.router.addresses[1] + + def test_auto_link_attach_detach_reattch(self): + test = AutoLinkDetachAfterAttachTest(self.route_address, 'myListener.1') + test.run() + self.assertEqual(None, test.error) + + +class AutoLinkRetryTest(TestCase): + inter_router_port = None + + @classmethod + def router(cls, name, config): + config = Qdrouterd.Config(config) + cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) + + @classmethod + def setUpClass(cls): + super(AutoLinkRetryTest, cls).setUpClass() + cls.routers = [] + + cls.inter_router_port = cls.tester.get_port() + + cls.router('B', + [ + ('router', {'mode': 'standalone', 'id': 'B'}), + ('listener', {'role': 'normal', + 'port': cls.tester.get_port()}), + ('listener', {'host': '127.0.0.1', + 'role': 'normal', + 'port': cls.inter_router_port}), + # Note here that the distribution of the address + # 'examples' is set to 'unavailable' + # This will ensure that any attach coming in for + # this address will be rejected. + ('address', + {'prefix': 'examples', + 'name': 'unavailable-address', + 'distribution': 'unavailable'}), + ]) + + cls.router('A', [ + ('router', {'mode': 'standalone', 'id': 'A'}), + ('listener', {'host': '127.0.0.1', 'role': 'normal', + 'port': cls.tester.get_port()}), + + ('connector', {'host': '127.0.0.1', 'name': 'connectorToB', + 'role': 'route-container', + 'port': cls.inter_router_port}), + + ('autoLink', {'connection': 'connectorToB', + 'addr': 'examples', 'direction': 'in'}), + ('autoLink', {'connection': 'connectorToB', + 'addr': 'examples', 'direction': 'out'}), + ]) + + def __init__(self, test_method): + TestCase.__init__(self, test_method) + self.success = False + self.timer_delay = 6 + self.max_attempts = 2 + self.attempts = 0 + + def address(self): + return self.routers[1].addresses[0] + + def check_auto_link(self): + long_type = 'org.apache.qpid.dispatch.router.config.autoLink' + query_command = 'QUERY --type=' + long_type + output = json.loads(self.run_qdmanage(query_command)) + + if output[0].get('operStatus') == "active": + self.success = True + else: + self.schedule_auto_link_reconnect_test() + + self.attempts += 1 + + def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None): + p = self.popen( + ['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address(), '--indent=-1', '--timeout', str(TIMEOUT)], + stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect, + universal_newlines=True) + out = p.communicate(input)[0] + try: + p.teardown() + except Exception as e: + raise Exception("%s\n%s" % (e, out)) + return out + + def can_terminate(self): + if self.attempts == self.max_attempts: + return True + + if self.success: + return True + + return False + + def schedule_auto_link_reconnect_test(self): + if self.attempts < self.max_attempts: + if not self.success: + Timer(self.timer_delay, self.check_auto_link).start() + + def test_auto_link_reattch(self): + long_type = 'org.apache.qpid.dispatch.router.config.autoLink' + query_command = 'QUERY --type=' + long_type + output = json.loads(self.run_qdmanage(query_command)) + + # Since the distribution of the autoLinked address 'examples' + # is set to unavailable, the link route will initially be in the + # failed state + self.assertEqual(output[0]['operStatus'], 'failed') + self.assertEqual(output[0]['lastError'], 'Node not found') + + # Now, we delete the address 'examples' (it becomes available) + # The Router A must now + # re-attempt to establish the autoLink and once the autoLink + # is up, it should return to the 'active' state. + delete_command = 'DELETE --type=address --name=unavailable-address' + self.run_qdmanage(delete_command, address=self.routers[0].addresses[0]) + + self.schedule_auto_link_reconnect_test() + + while not self.can_terminate(): + pass + + self.assertTrue(self.success) + + class WaypointReceiverPhaseTest(TestCase): inter_router_port = None --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org