From commits-return-49384-archive-asf-public=cust-asf.ponee.io@qpid.apache.org Wed Jan 15 14:40:34 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id DBC0A18065E for ; Wed, 15 Jan 2020 15:40:33 +0100 (CET) Received: (qmail 63831 invoked by uid 500); 15 Jan 2020 14:40:26 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 63758 invoked by uid 99); 15 Jan 2020 14:40:26 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Jan 2020 14:40:26 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 8AE358194F; Wed, 15 Jan 2020 14:40:26 +0000 (UTC) Date: Wed, 15 Jan 2020 14:40:26 +0000 To: "commits@qpid.apache.org" Subject: [qpid-dispatch] branch master updated: DISPATCH-1540 - Set the pre-settled flag appropriately on a delivery and its peers on large streaming messages. This will allow the presettled counters to be updated correctly. This closes #658. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <157909922646.4909.16664878860197060992@gitbox.apache.org> From: gmurthy@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: qpid-dispatch X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 588f38ec9203f95dffa5dec248388a068a6cfdd3 X-Git-Newrev: 9d3bf33f840b6fcce53db6db50365a3ee0bbb3de X-Git-Rev: 9d3bf33f840b6fcce53db6db50365a3ee0bbb3de X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. gmurthy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/master by this push: new 9d3bf33 DISPATCH-1540 - Set the pre-settled flag appropriately on a delivery and its peers on large streaming messages. This will allow the presettled counters to be updated correctly. This closes #658. 9d3bf33 is described below commit 9d3bf33f840b6fcce53db6db50365a3ee0bbb3de Author: Ganesh Murthy AuthorDate: Wed Jan 8 09:30:50 2020 -0500 DISPATCH-1540 - Set the pre-settled flag appropriately on a delivery and its peers on large streaming messages. This will allow the presettled counters to be updated correctly. This closes #658. --- src/router_core/delivery.c | 17 +++- src/router_core/delivery.h | 4 +- src/router_core/router_core_private.h | 1 + src/router_node.c | 6 +- tests/system_test.py | 26 ++++++ tests/system_tests_delivery_counts.py | 151 +++++++++++++++++++++++++++++++++- 6 files changed, 198 insertions(+), 7 deletions(-) diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c index 7471716..3451ee8 100644 --- a/src/router_core/delivery.c +++ b/src/router_core/delivery.c @@ -201,13 +201,15 @@ void qdr_delivery_remote_state_updated(qdr_core_t *core, qdr_delivery_t *deliver } -qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core,qdr_delivery_t *in_dlv) +qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core,qdr_delivery_t *in_dlv, bool settled) { + qdr_action_t *action = qdr_action(qdr_deliver_continue_CT, "deliver_continue"); action->args.connection.delivery = in_dlv; qd_message_t *msg = qdr_delivery_message(in_dlv); action->args.connection.more = !qd_message_receive_complete(msg); + action->args.delivery.presettled = settled; // This incref is for the action reference qdr_delivery_incref(in_dlv, "qdr_deliver_continue - add to action list"); @@ -1022,6 +1024,9 @@ void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv) qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv); while (peer) { + if (! peer->presettled && in_dlv->presettled) { + peer->presettled = in_dlv->presettled; + } qdr_link_work_t *work = peer->link_work; qdr_link_t *peer_link = qdr_delivery_link(peer); @@ -1057,6 +1062,16 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool qdr_delivery_t *in_dlv = action->args.connection.delivery; bool more = action->args.connection.more; + bool presettled = action->args.delivery.presettled; + + // + // If the delivery is already pre-settled, don't do anything with the pre-settled flag. + // + // If the in_delivery was not pre-settled, you can go to pre-settled. + if (! in_dlv->presettled && presettled) { + in_dlv->presettled = presettled; + } + qdr_link_t *link = qdr_delivery_link(in_dlv); // diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h index ac29739..3553b74 100644 --- a/src/router_core/delivery.h +++ b/src/router_core/delivery.h @@ -50,7 +50,7 @@ struct qdr_delivery_t { pn_data_t *extension_state; ///< delivery-state in disposition performative qdr_error_t *error; bool settled; - bool presettled; + bool presettled; /// Proton does not have a notion of pre-settled. This flag is introduced in Dispatch and should exclusively be used only to update management counters like presettled delivery counts on links etc. This flag DOES NOT represent the remote settlement state of the delivery. qdr_delivery_where_t where; uint8_t tag[QDR_DELIVERY_TAG_MAX]; int tag_length; @@ -116,7 +116,7 @@ void qdr_delivery_remote_state_updated(qdr_core_t *core, qdr_delivery_t *deliver bool settled, qdr_error_t *error, pn_data_t *ext_state, bool ref_given); /* invoked when incoming message data arrives - schedule core thread */ -qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core, qdr_delivery_t *delivery); +qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core, qdr_delivery_t *delivery, bool settled); // diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 614b8d5..77abf9e 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -149,6 +149,7 @@ struct qdr_action_t { qdr_delivery_t *delivery; uint64_t disposition; bool settled; + bool presettled; qdr_error_t *error; } delivery; diff --git a/src/router_node.c b/src/router_node.c index 5568c5c..8f4da6e 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -376,7 +376,7 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link) // if (delivery) { - qdr_deliver_continue(router->router_core, delivery); + qdr_deliver_continue(router->router_core, delivery, pn_delivery_settled(pnd)); return next_delivery; } @@ -1729,9 +1729,9 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di return; // - // If the disposition has changed, update the proton delivery. + // If the disposition has changed and the proton delivery has not already been settled, update the proton delivery. // - if (disp != pn_delivery_remote_state(pnd) && !qdr_delivery_presettled(dlv)) { + if (disp != pn_delivery_remote_state(pnd) && !pn_delivery_settled(pnd)) { qd_message_t *msg = qdr_delivery_message(dlv); if (disp == PN_MODIFIED) diff --git a/tests/system_test.py b/tests/system_test.py index 8944cea..3857ae8 100755 --- a/tests/system_test.py +++ b/tests/system_test.py @@ -1171,3 +1171,29 @@ def get_link_info(name, address): if item.get('name') == name: return item return None + +def has_mobile_dest_in_address_table(address, dest): + qdm = QdManager(address=address) + rc = qdm.query('org.apache.qpid.dispatch.router.address') + has_dest = False + for item in rc: + if dest in item.get("name"): + has_dest = True + break + return has_dest + + +def get_inter_router_links(address): + """ + Return a list of all links with type="inter-router + :param address: + """ + inter_router_links = [] + qdm = QdManager(address=address) + rc = qdm.query('org.apache.qpid.dispatch.router.link') + for item in rc: + if item.get("linkType") == "inter-router": + inter_router_links.append(item) + + return inter_router_links + diff --git a/tests/system_tests_delivery_counts.py b/tests/system_tests_delivery_counts.py index fd99092..61c976c 100644 --- a/tests/system_tests_delivery_counts.py +++ b/tests/system_tests_delivery_counts.py @@ -21,12 +21,14 @@ from time import sleep from proton import Condition, Message, Delivery, Timeout from system_test import TestCase, Qdrouterd, TIMEOUT -from system_test import get_link_info +from system_test import get_link_info, get_inter_router_links, has_mobile_dest_in_address_table from system_test import PollTimeout from proton.handlers import MessagingHandler from proton.reactor import Container from qpid_dispatch.management.client import Node +LARGE_PAYLOAD = ("X" * 1024) * 30 + _LINK_STATISTIC_KEYS = set(['unsettledCount', 'undeliveredCount', @@ -242,6 +244,153 @@ class TwoRouterReleasedDroppedPresettledTest(TestCase): 'droppedPresettledCount']))) +class AddressCheckerTimeout ( object ): + def __init__(self, parent): + self.parent = parent + + def on_timer_task(self, event): + self.parent.address_check_timeout() + +class LargePresettledLinkCounterTest(MessagingHandler): + def __init__(self, sender_addr, receiver_addr): + super(LargePresettledLinkCounterTest, self).__init__() + self.timer = None + self.sender_conn = None + self.receiver_conn = None + self.receiver = None + self.error = None + self.n_sent = 0 + self.n_received = 0 + self.num_messages = 25 + self.sender_addr = sender_addr + self.receiver_addr = receiver_addr + self.dest = "LargePresettledLinkCounterTest" + self.links = None + self.success = False + self.address_check_timer = None + self.container = None + self.num_attempts = 0 + self.reactor = None + self.done = False + + def check_if_done(self): + if self.done: + # Step 5: All messages have been received by receiver. + # Check the presettled count on the inter-router link of + # Router B (where the receiver is attached). + self.links = get_inter_router_links(self.receiver_addr) + for link in self.links: + # The self.num_messages + 1 is because before this test started the presettledCount was 1 + if link.get("linkDir") == "in" and link.get("presettledCount") == self.num_messages + 1: + self.success = True + break + self.sender_conn.close() + self.receiver_conn.close() + self.timer.cancel() + + def address_check_timeout(self): + if has_mobile_dest_in_address_table(self.sender_addr, self.dest): + # Step 3: The address has propagated to Router A. Now attach a sender + # to router A. + self.sender_conn = self.container.connect(self.sender_addr) + self.sender = self.container.create_sender(self.sender_conn, + self.dest, + name='SenderA') + else: + if self.num_attempts < 2: + self.address_check_timer = self.reactor.schedule(2, + AddressCheckerTimeout(self)) + self.num_attempts += 1 + + def timeout(self): + self.error = "Timeout Expired: self.n_sent=%d, self.self.n_received=%d " % (self.n_sent, self.n_received) + self.sender_conn.close() + self.receiver_conn.close() + + def on_start(self, event): + self.container = event.container + self.timer = event.reactor.schedule(TIMEOUT, Timeout(self)) + # Step 1: Create a receiver with name ReceiverA to address LargePresettledLinkCounterTest + # This receiver is attached to router B. Later a sender will be + # created which will be connected to Router A. The sender will send + # on the same address that the receiver is receiving on. + self.receiver_conn = event.container.connect(self.receiver_addr) + self.receiver = event.container.create_receiver(self.receiver_conn, + self.dest, + name='ReceiverA') + + def on_link_opened(self, event): + self.reactor = event.reactor + if event.receiver: + # Step 2: The receiver link has been opened. + # Give 2 seconds for the address to propagate to the other router (Router A) + self.address_check_timer = event.reactor.schedule(2, AddressCheckerTimeout(self)) + self.num_attempts += 1 + + def on_sendable(self, event): + # Step 4: Send self.num_messages multi-frame large pre-settled messages. + # These messages will travel over inter-router link to Router B. + if self.n_sent < self.num_messages: + msg = Message(body=LARGE_PAYLOAD) + dlv = self.sender.send(msg) + # We are sending a pre-settled large multi frame message. + dlv.settle() + self.n_sent += 1 + + def on_message(self, event): + if self.receiver == event.receiver: + self.n_received += 1 + if self.n_received == self.num_messages: + self.done = True + self.check_if_done() + + def run(self): + Container(self).run() + + + +class TwoRouterLargeMessagePresettledCountTest(TestCase): + @classmethod + def setUpClass(cls): + super(TwoRouterLargeMessagePresettledCountTest, cls).setUpClass() + + listen_port_1 = cls.tester.get_port() + listen_port_2 = cls.tester.get_port() + listen_port_inter_router = cls.tester.get_port() + + config_1 = Qdrouterd.Config([ + ('router', {'mode': 'interior', 'id': 'A'}), + ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), + ('listener', {'port': listen_port_1, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}), + ('listener', {'role': 'inter-router', 'port': listen_port_inter_router, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}), + ]) + + config_2 = Qdrouterd.Config([ + ('router', {'mode': 'interior', 'id': 'B'}), + ('listener', {'port': listen_port_2, 'authenticatePeer': False, 'saslMechanisms': 'ANONYMOUS'}), + ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': listen_port_inter_router, + 'verifyHostname': 'no'}), + ]) + + cls.routers = [] + cls.routers.append(cls.tester.qdrouterd("A", config_1, wait=True)) + cls.routers.append(cls.tester.qdrouterd("B", config_2, wait=True)) + cls.routers[1].wait_router_connected('A') + + def test_verify_inter_router_presettled_count_DISPATCH_1540(self): + sender_address = self.routers[0].addresses[0] + receiver_address = self.routers[1].addresses[0] + # Sends presettled large messages across routers and checks + # the pre-settled count on the inter-router link of the downstream + # router (i.e. that to which receiver is attached) + # This test will fail if DISPATCH-1540 is not fixed since the + # pre-settled count will show zero + test = LargePresettledLinkCounterTest(sender_address, receiver_address) + test.run() + self.assertTrue(test.success) + + + class LinkRouteIngressEgressTransitTest(TestCase): @classmethod def setUpClass(cls): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org