qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gmur...@apache.org
Subject [qpid-dispatch] branch master updated: DISPATCH-1541: Sets the presettled flag on large streaming discarded messages. This will help set the correct presettled related counters on the link. This closes #663
Date Thu, 16 Jan 2020 18:24:31 GMT
This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 3234b9d  DISPATCH-1541: Sets the presettled flag on large streaming discarded messages.
This will help set the correct presettled related counters on the link. This closes #663
3234b9d is described below

commit 3234b9d61040759da760a05eb8102b7897a99ac9
Author: Ganesh Murthy <gmurthy@apache.org>
AuthorDate: Wed Jan 15 12:16:40 2020 -0500

    DISPATCH-1541: Sets the presettled flag on large streaming discarded messages. This will
help set the correct presettled related counters on the link. This closes #663
---
 src/router_core/delivery.c            |   5 ++
 src/router_core/delivery.h            |   7 ++
 src/router_node.c                     |  10 +++
 tests/system_tests_delivery_counts.py | 153 ++++++++++++++++++++++++++++++++++
 4 files changed, 175 insertions(+)

diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c
index 4d62902..df31f80 100644
--- a/src/router_core/delivery.c
+++ b/src/router_core/delivery.c
@@ -124,6 +124,11 @@ bool qdr_delivery_is_aborted(const qdr_delivery_t *delivery)
     return qd_message_aborted(delivery->msg);
 }
 
+void qdr_delivery_set_presettled(qdr_delivery_t *delivery)
+{
+    if (delivery)
+        delivery->presettled = true;
+}
 
 void qdr_delivery_decref(qdr_core_t *core, qdr_delivery_t *delivery, const char *label)
 {
diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h
index 3553b74..9a7edfb 100644
--- a/src/router_core/delivery.h
+++ b/src/router_core/delivery.h
@@ -110,6 +110,13 @@ void qdr_delivery_write_extension_state(qdr_delivery_t *dlv, pn_delivery_t*
pdlv
 /* release dlv and possibly schedule its deletion on the core thread */
 void qdr_delivery_decref(qdr_core_t *core, qdr_delivery_t *delivery, const char *label);
 
+/** Set the presettled flag on the delivery to true if it is not already true.
+ * The presettled flag can only go from false to true and not vice versa.
+ * This function should only be called when the delivery has been discarded and receive_complete
flag is true in which case there
+ * will be no thread contention.
+**/
+void qdr_delivery_set_presettled(qdr_delivery_t *delivery);
+
 /* handles delivery disposition and settlement changes from the remote end of
  * the link, and schedules Core thread */
 void qdr_delivery_remote_state_updated(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t
disp,
diff --git a/src/router_node.c b/src/router_node.c
index 8f4da6e..b923b7d 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -355,6 +355,16 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
         // Message has been marked for discard, no further processing necessary
         //
         if (receive_complete) {
+            // If this discarded delivery has already been settled by proton,
+            // set the presettled flag on the delivery to true if it is not already true.
+            // Since the entire message has already been received, we directly call the
+            // function to set the pre-settled flag since we cannot go thru the core-thread
+            // to do this since the delivery has been discarded.
+            // Discarded streaming deliveries are not put thru the core thread via the continue
action.
+            if (pn_delivery_settled(pnd))
+                qdr_delivery_set_presettled(delivery);
+
+
             // note: expected that the code that set discard has handled
             // setting disposition and updating flow!
             pn_delivery_settle(pnd);
diff --git a/tests/system_tests_delivery_counts.py b/tests/system_tests_delivery_counts.py
index 61c976c..3c407b9 100644
--- a/tests/system_tests_delivery_counts.py
+++ b/tests/system_tests_delivery_counts.py
@@ -251,6 +251,14 @@ class AddressCheckerTimeout ( object ):
     def on_timer_task(self, event):
         self.parent.address_check_timeout()
 
+
+class CounterCheckerTimeout ( object ):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.count_check_timeout()
+
 class LargePresettledLinkCounterTest(MessagingHandler):
     def __init__(self, sender_addr, receiver_addr):
         super(LargePresettledLinkCounterTest, self).__init__()
@@ -348,6 +356,110 @@ class LargePresettledLinkCounterTest(MessagingHandler):
         Container(self).run()
 
 
+class LargePresettledReleasedLinkCounterTest(MessagingHandler):
+    def __init__(self, sender_addr, receiver_addr):
+        super(LargePresettledReleasedLinkCounterTest, self).__init__(prefetch=0)
+        self.sender_addr = sender_addr
+        self.receiver_addr = receiver_addr
+        self.dest = "LargePresettledReleasedLinkCounterTest"
+        self.receiver_dropoff_count = 50
+        self.num_messages = 200
+        self.num_attempts = 0
+        self.n_sent= 0
+        self.done = False
+        self.n_received = 0
+        self.count_check_timer = None
+        self.success = False
+        self.links = None
+        self.receiver_conn_closed = False
+
+    def check_if_done(self):
+        # Step 6:
+        # Check the counts on the inter-router link of
+        # Router B (where the receiver is attached). There
+        # should be no released or modified messages.
+        self.links = get_inter_router_links(self.receiver_addr)
+        for link in self.links:
+            # We don't know how many deliveries got from one side of the
+            # inter-router link to the other but there should at least be as
+            # many as was sent to the receiver
+            if link.get("linkDir") == "in" \
+                    and link.get("presettledCount") > self.receiver_dropoff_count \
+                    and link.get("deliveryCount") > self.receiver_dropoff_count \
+                    and link.get("releasedCount") == 0\
+                    and link.get("modifiedCount") == 0:
+                self.success = True
+                break
+        self.sender_conn.close()
+        self.timer.cancel()
+
+    def count_check_timeout(self):
+        self.check_if_done()
+
+    def address_check_timeout(self):
+        if has_mobile_dest_in_address_table(self.sender_addr, self.dest):
+            # Step 3: The address has propagated to Router A. Now attach a sender
+            # to router A.
+            self.sender_conn = self.container.connect(self.sender_addr)
+            self.sender = self.container.create_sender(self.sender_conn,
+                                                       self.dest,
+                                                       name='SenderA')
+        else:
+            if self.num_attempts < 2:
+                self.address_check_timer = self.reactor.schedule(2, AddressCheckerTimeout(self))
+                self.num_attempts += 1
+
+    def timeout(self):
+        self.error = "Timeout Expired: self.n_sent=%d, self.self.n_received=%d  " % (self.n_sent,
self.n_received)
+        self.sender_conn.close()
+        if not self.receiver_conn_closed:
+            self.receiver_conn.close()
+
+    def on_start(self, event):
+        self.container = event.container
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        # Step 1: Create a receiver with name ReceiverA to address LargePresettledReleasedLinkCounterTest
+        # This receiver is attached to router B. Later a sender will be
+        # created which will be connected to Router A. The sender will send
+        # on the same address that the receiver is receiving on.
+        self.receiver_conn = event.container.connect(self.receiver_addr)
+        self.receiver = event.container.create_receiver(self.receiver_conn,
+                                                        self.dest,
+                                                        name='ReceiverA')
+        self.receiver.flow(self.receiver_dropoff_count)
+
+    def on_link_opened(self, event):
+        self.reactor = event.reactor
+        if event.receiver:
+            # Step 2: The receiver link has been opened.
+            # Give 2 seconds for the address to propagate to the other router (Router A)
+            self.address_check_timer = event.reactor.schedule(2, AddressCheckerTimeout(self))
+            self.num_attempts += 1
+
+    def on_sendable(self, event):
+        # Step 4: Send self.num_messages multi-frame large pre-settled messages.
+        # These messages will travel over inter-router link to Router B.
+        while self.n_sent < self.num_messages:
+            msg = Message(body=LARGE_PAYLOAD)
+            dlv = self.sender.send(msg)
+            # We are sending a pre-settled large multi frame message.
+            dlv.settle()
+            self.n_sent += 1
+
+    def on_message(self, event):
+        if self.receiver == event.receiver and not self.done:
+            self.n_received += 1
+            # Step 5: The receiver receives only 50 messages out of the 200
+            # messages and drops out.
+            if self.n_received == self.receiver_dropoff_count:
+                self.done = True
+                self.receiver_conn.close()
+                self.receiver_conn_closed = True
+                self.count_check_timer = event.reactor.schedule(3, CounterCheckerTimeout(self))
+
+    def run(self):
+        Container(self).run()
+
 
 class TwoRouterLargeMessagePresettledCountTest(TestCase):
     @classmethod
@@ -390,6 +502,47 @@ class TwoRouterLargeMessagePresettledCountTest(TestCase):
         self.assertTrue(test.success)
 
 
+class TwoRouterLargeMessagePresettledReleasedCountTest(TestCase):
+    @classmethod
+    def setUpClass(cls):
+        super(TwoRouterLargeMessagePresettledReleasedCountTest, cls).setUpClass()
+
+        listen_port_1 = cls.tester.get_port()
+        listen_port_2 = cls.tester.get_port()
+        listen_port_inter_router = cls.tester.get_port()
+
+        config_1 = Qdrouterd.Config([
+            ('router', {'mode': 'interior', 'id': 'A'}),
+            ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+            ('listener', {'port': listen_port_1, 'authenticatePeer': False, 'saslMechanisms':
'ANONYMOUS'}),
+            ('listener', {'role': 'inter-router', 'port': listen_port_inter_router, 'authenticatePeer':
False, 'saslMechanisms': 'ANONYMOUS'}),
+           ])
+
+        config_2 = Qdrouterd.Config([
+            ('router', {'mode': 'interior', 'id': 'B'}),
+            ('listener', {'port': listen_port_2, 'authenticatePeer': False, 'saslMechanisms':
'ANONYMOUS'}),
+            ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': listen_port_inter_router,
+                           'verifyHostname': 'no'}),
+            ])
+
+        cls.routers = []
+        cls.routers.append(cls.tester.qdrouterd("A", config_1, wait=True))
+        cls.routers.append(cls.tester.qdrouterd("B", config_2, wait=True))
+        cls.routers[1].wait_router_connected('A')
+
+    def test_verify_inter_router_presettled_released_count_DISPATCH_1541(self):
+        # This test sends presettled large messages across routers. A sender is on
+        # router A and a receiver on B. The sender sends 200 messages, the receiver
+        # receives 50 messages and goes away by closing its connection. There should be no
released or
+        # modified messages on the incoming inter-router link on Router B
+        # This test will fail without the patch to DISPATCH-1541
+        sender_address = self.routers[0].addresses[0]
+        receiver_address = self.routers[1].addresses[0]
+        test = LargePresettledReleasedLinkCounterTest(sender_address, receiver_address)
+        test.run()
+        self.assertTrue(test.success)
+
+
 
 class LinkRouteIngressEgressTransitTest(TestCase):
     @classmethod


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message