qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gmur...@apache.org
Subject [qpid-dispatch] branch master updated: DISPATCH-1540 - Set the pre-settled flag appropriately on a delivery and its peers on large streaming messages. This will allow the presettled counters to be updated correctly. This closes #658.
Date Wed, 15 Jan 2020 14:40:26 GMT
This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d3bf33  DISPATCH-1540 - Set the pre-settled flag appropriately on a delivery and
its peers on large streaming messages. This will allow the presettled counters to be updated
correctly. This closes #658.
9d3bf33 is described below

commit 9d3bf33f840b6fcce53db6db50365a3ee0bbb3de
Author: Ganesh Murthy <gmurthy@apache.org>
AuthorDate: Wed Jan 8 09:30:50 2020 -0500

    DISPATCH-1540 - Set the pre-settled flag appropriately on a delivery and its peers on
large streaming messages. This will allow the presettled counters to be updated correctly.
This closes #658.
---
 src/router_core/delivery.c            |  17 +++-
 src/router_core/delivery.h            |   4 +-
 src/router_core/router_core_private.h |   1 +
 src/router_node.c                     |   6 +-
 tests/system_test.py                  |  26 ++++++
 tests/system_tests_delivery_counts.py | 151 +++++++++++++++++++++++++++++++++-
 6 files changed, 198 insertions(+), 7 deletions(-)

diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c
index 7471716..3451ee8 100644
--- a/src/router_core/delivery.c
+++ b/src/router_core/delivery.c
@@ -201,13 +201,15 @@ void qdr_delivery_remote_state_updated(qdr_core_t *core, qdr_delivery_t
*deliver
 }
 
 
-qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core,qdr_delivery_t *in_dlv)
+qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core,qdr_delivery_t *in_dlv, bool settled)
 {
+
     qdr_action_t   *action = qdr_action(qdr_deliver_continue_CT, "deliver_continue");
     action->args.connection.delivery = in_dlv;
 
     qd_message_t *msg = qdr_delivery_message(in_dlv);
     action->args.connection.more = !qd_message_receive_complete(msg);
+    action->args.delivery.presettled = settled;
 
     // This incref is for the action reference
     qdr_delivery_incref(in_dlv, "qdr_deliver_continue - add to action list");
@@ -1022,6 +1024,9 @@ void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t
*in_dlv)
     qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv);
 
     while (peer) {
+        if (! peer->presettled && in_dlv->presettled) {
+            peer->presettled       = in_dlv->presettled;
+        }
         qdr_link_work_t *work      = peer->link_work;
         qdr_link_t      *peer_link = qdr_delivery_link(peer);
 
@@ -1057,6 +1062,16 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t
*action, bool
 
     qdr_delivery_t *in_dlv  = action->args.connection.delivery;
     bool more = action->args.connection.more;
+    bool presettled = action->args.delivery.presettled;
+
+    //
+    // If the delivery is already pre-settled, don't do anything with the pre-settled flag.
+    //
+    // If the in_delivery was not pre-settled, you can go to pre-settled.
+    if (! in_dlv->presettled && presettled) {
+        in_dlv->presettled = presettled;
+    }
+
     qdr_link_t *link = qdr_delivery_link(in_dlv);
 
     //
diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h
index ac29739..3553b74 100644
--- a/src/router_core/delivery.h
+++ b/src/router_core/delivery.h
@@ -50,7 +50,7 @@ struct qdr_delivery_t {
     pn_data_t              *extension_state;     ///< delivery-state in disposition performative
     qdr_error_t            *error;
     bool                    settled;
-    bool                    presettled;
+    bool                    presettled; /// Proton does not have a notion of pre-settled.
This flag is introduced in Dispatch and should exclusively be used only to update management
counters like presettled delivery counts on links etc. This flag DOES NOT represent the remote
settlement state of the delivery.
     qdr_delivery_where_t    where;
     uint8_t                 tag[QDR_DELIVERY_TAG_MAX];
     int                     tag_length;
@@ -116,7 +116,7 @@ void qdr_delivery_remote_state_updated(qdr_core_t *core, qdr_delivery_t
*deliver
                                        bool settled, qdr_error_t *error, pn_data_t *ext_state,
bool ref_given);
 
 /* invoked when incoming message data arrives - schedule core thread */
-qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core, qdr_delivery_t *delivery);
+qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core, qdr_delivery_t *delivery, bool settled);
 
 
 //
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 614b8d5..77abf9e 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -149,6 +149,7 @@ struct qdr_action_t {
             qdr_delivery_t *delivery;
             uint64_t        disposition;
             bool            settled;
+            bool            presettled;
             qdr_error_t    *error;
         } delivery;
 
diff --git a/src/router_node.c b/src/router_node.c
index 5568c5c..8f4da6e 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -376,7 +376,7 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
     //
 
     if (delivery) {
-        qdr_deliver_continue(router->router_core, delivery);
+        qdr_deliver_continue(router->router_core, delivery, pn_delivery_settled(pnd));
         return next_delivery;
     }
 
@@ -1729,9 +1729,9 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv,
uint64_t di
         return;
 
     //
-    // If the disposition has changed, update the proton delivery.
+    // If the disposition has changed and the proton delivery has not already been settled,
update the proton delivery.
     //
-    if (disp != pn_delivery_remote_state(pnd) && !qdr_delivery_presettled(dlv)) {
+    if (disp != pn_delivery_remote_state(pnd) && !pn_delivery_settled(pnd)) {
         qd_message_t *msg = qdr_delivery_message(dlv);
 
         if (disp == PN_MODIFIED)
diff --git a/tests/system_test.py b/tests/system_test.py
index 8944cea..3857ae8 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -1171,3 +1171,29 @@ def get_link_info(name, address):
         if item.get('name') == name:
             return item
     return None
+
+def has_mobile_dest_in_address_table(address, dest):
+    qdm = QdManager(address=address)
+    rc = qdm.query('org.apache.qpid.dispatch.router.address')
+    has_dest = False
+    for item in rc:
+        if dest in item.get("name"):
+            has_dest = True
+            break
+    return has_dest
+
+
+def get_inter_router_links(address):
+    """
+    Return a list of all links with type="inter-router
+    :param address:
+    """
+    inter_router_links = []
+    qdm = QdManager(address=address)
+    rc = qdm.query('org.apache.qpid.dispatch.router.link')
+    for item in rc:
+        if item.get("linkType") == "inter-router":
+            inter_router_links.append(item)
+
+    return inter_router_links
+
diff --git a/tests/system_tests_delivery_counts.py b/tests/system_tests_delivery_counts.py
index fd99092..61c976c 100644
--- a/tests/system_tests_delivery_counts.py
+++ b/tests/system_tests_delivery_counts.py
@@ -21,12 +21,14 @@ from time import sleep
 
 from proton import Condition, Message, Delivery,  Timeout
 from system_test import TestCase, Qdrouterd, TIMEOUT
-from system_test import get_link_info
+from system_test import get_link_info, get_inter_router_links, has_mobile_dest_in_address_table
 from system_test import PollTimeout
 from proton.handlers import MessagingHandler
 from proton.reactor import Container
 from qpid_dispatch.management.client import Node
 
+LARGE_PAYLOAD = ("X" * 1024) * 30
+
 
 _LINK_STATISTIC_KEYS = set(['unsettledCount',
                             'undeliveredCount',
@@ -242,6 +244,153 @@ class TwoRouterReleasedDroppedPresettledTest(TestCase):
                                                                          'droppedPresettledCount'])))
 
 
+class AddressCheckerTimeout ( object ):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.address_check_timeout()
+
+class LargePresettledLinkCounterTest(MessagingHandler):
+    def __init__(self, sender_addr, receiver_addr):
+        super(LargePresettledLinkCounterTest, self).__init__()
+        self.timer = None
+        self.sender_conn = None
+        self.receiver_conn = None
+        self.receiver = None
+        self.error = None
+        self.n_sent = 0
+        self.n_received = 0
+        self.num_messages = 25
+        self.sender_addr = sender_addr
+        self.receiver_addr = receiver_addr
+        self.dest = "LargePresettledLinkCounterTest"
+        self.links = None
+        self.success = False
+        self.address_check_timer = None
+        self.container = None
+        self.num_attempts = 0
+        self.reactor = None
+        self.done = False
+
+    def check_if_done(self):
+        if self.done:
+            # Step 5: All messages have been received by receiver.
+            # Check the presettled count on the inter-router link of
+            # Router B (where the receiver is attached).
+            self.links = get_inter_router_links(self.receiver_addr)
+            for link in self.links:
+                # The self.num_messages + 1 is because before this test started the presettledCount
was 1
+                if link.get("linkDir") == "in" and link.get("presettledCount") == self.num_messages
+ 1:
+                    self.success = True
+                    break
+            self.sender_conn.close()
+            self.receiver_conn.close()
+            self.timer.cancel()
+
+    def address_check_timeout(self):
+        if has_mobile_dest_in_address_table(self.sender_addr, self.dest):
+            # Step 3: The address has propagated to Router A. Now attach a sender
+            # to router A.
+            self.sender_conn = self.container.connect(self.sender_addr)
+            self.sender = self.container.create_sender(self.sender_conn,
+                                                        self.dest,
+                                                        name='SenderA')
+        else:
+            if self.num_attempts < 2:
+                self.address_check_timer = self.reactor.schedule(2,
+                                                                 AddressCheckerTimeout(self))
+                self.num_attempts += 1
+
+    def timeout(self):
+        self.error = "Timeout Expired: self.n_sent=%d, self.self.n_received=%d  " % (self.n_sent,
self.n_received)
+        self.sender_conn.close()
+        self.receiver_conn.close()
+
+    def on_start(self, event):
+        self.container = event.container
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        # Step 1: Create a receiver with name ReceiverA to address LargePresettledLinkCounterTest
+        # This receiver is attached to router B. Later a sender will be
+        # created which will be connected to Router A. The sender will send
+        # on the same address that the receiver is receiving on.
+        self.receiver_conn = event.container.connect(self.receiver_addr)
+        self.receiver = event.container.create_receiver(self.receiver_conn,
+                                                        self.dest,
+                                                        name='ReceiverA')
+
+    def on_link_opened(self, event):
+        self.reactor = event.reactor
+        if event.receiver:
+            # Step 2: The receiver link has been opened.
+            # Give 2 seconds for the address to propagate to the other router (Router A)
+            self.address_check_timer = event.reactor.schedule(2, AddressCheckerTimeout(self))
+            self.num_attempts += 1
+
+    def on_sendable(self, event):
+        # Step 4: Send self.num_messages multi-frame large pre-settled messages.
+        # These messages will travel over inter-router link to Router B.
+        if self.n_sent < self.num_messages:
+            msg = Message(body=LARGE_PAYLOAD)
+            dlv = self.sender.send(msg)
+            # We are sending a pre-settled large multi frame message.
+            dlv.settle()
+            self.n_sent += 1
+
+    def on_message(self, event):
+        if self.receiver == event.receiver:
+            self.n_received += 1
+            if self.n_received == self.num_messages:
+                self.done = True
+            self.check_if_done()
+
+    def run(self):
+        Container(self).run()
+
+
+
+class TwoRouterLargeMessagePresettledCountTest(TestCase):
+    @classmethod
+    def setUpClass(cls):
+        super(TwoRouterLargeMessagePresettledCountTest, cls).setUpClass()
+
+        listen_port_1 = cls.tester.get_port()
+        listen_port_2 = cls.tester.get_port()
+        listen_port_inter_router = cls.tester.get_port()
+
+        config_1 = Qdrouterd.Config([
+            ('router', {'mode': 'interior', 'id': 'A'}),
+            ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+            ('listener', {'port': listen_port_1, 'authenticatePeer': False, 'saslMechanisms':
'ANONYMOUS'}),
+            ('listener', {'role': 'inter-router', 'port': listen_port_inter_router, 'authenticatePeer':
False, 'saslMechanisms': 'ANONYMOUS'}),
+           ])
+
+        config_2 = Qdrouterd.Config([
+            ('router', {'mode': 'interior', 'id': 'B'}),
+            ('listener', {'port': listen_port_2, 'authenticatePeer': False, 'saslMechanisms':
'ANONYMOUS'}),
+            ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': listen_port_inter_router,
+                           'verifyHostname': 'no'}),
+            ])
+
+        cls.routers = []
+        cls.routers.append(cls.tester.qdrouterd("A", config_1, wait=True))
+        cls.routers.append(cls.tester.qdrouterd("B", config_2, wait=True))
+        cls.routers[1].wait_router_connected('A')
+
+    def test_verify_inter_router_presettled_count_DISPATCH_1540(self):
+        sender_address = self.routers[0].addresses[0]
+        receiver_address = self.routers[1].addresses[0]
+        # Sends presettled large messages across routers and checks
+        # the pre-settled count on the inter-router link of the downstream
+        # router (i.e. that to which receiver is attached)
+        # This test will fail if DISPATCH-1540 is not fixed since the
+        # pre-settled count will show zero
+        test = LargePresettledLinkCounterTest(sender_address, receiver_address)
+        test.run()
+        self.assertTrue(test.success)
+
+
+
 class LinkRouteIngressEgressTransitTest(TestCase):
     @classmethod
     def setUpClass(cls):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message