qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tr...@apache.org
Subject qpid-dispatch git commit: DISPATCH-458 - Added flush for credits that can, in some cases, be stuck on an incoming link. Added a test that causes the credit-hang.
Date Mon, 25 Jul 2016 18:50:29 GMT
Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 7968e159e -> 4c12a62b9


DISPATCH-458 - Added flush for credits that can, in some cases, be stuck on an incoming link.
Added a test that causes the credit-hang.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/4c12a62b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/4c12a62b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/4c12a62b

Branch: refs/heads/master
Commit: 4c12a62b964ab27b14ef41dc917d53a43e8df0fb
Parents: 7968e15
Author: Ted Ross <tross@redhat.com>
Authored: Mon Jul 25 14:49:00 2016 -0400
Committer: Ted Ross <tross@redhat.com>
Committed: Mon Jul 25 14:49:00 2016 -0400

----------------------------------------------------------------------
 src/router_core/connections.c         | 10 +++++
 src/router_core/router_core_private.h |  7 ++++
 src/router_core/transfer.c            | 19 ++++++++++
 tests/system_tests_one_router.py      | 60 ++++++++++++++++++++++++++++++
 4 files changed, 96 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/4c12a62b/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 336b90f..2b95b41 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -191,6 +191,16 @@ int qdr_connection_process(qdr_connection_t *conn)
             if (link->incremental_credit > 0) {
                 core->flow_handler(core->user_context, link, link->incremental_credit);
                 link->incremental_credit = 0;
+
+                //
+                // Note:  This unprotected read of a CT-only value is safe in this case.
+                // If there is pending credit on the link that needs to be pushed down to
+                // Proton, we need to give the core a kick to make sure it is sent.  It is
+                // possible that no more credit will be issued to cause the movement of CT
+                // credit to Proton credit (see DISPATCH-458).
+                //
+                if (link->incremental_credit_CT > 0)
+                    qdr_link_check_credit(core, link);
             }
             if (link->drain_mode_changed) {
                 core->drain_handler(core->user_context, link, link->drain_mode);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/4c12a62b/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 3113165..a578810 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -627,4 +627,11 @@ qdr_query_t *qdr_query(qdr_core_t              *core,
                        void                    *context,
                        qd_router_entity_type_t  type,
                        qd_composed_field_t     *body);
+
+//
+// Cause the core to check credit on an incoming link that might have CT credit but
+// no IO/Proton credit.
+//
+void qdr_link_check_credit(qdr_core_t *core, qdr_link_t *link);
+
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/4c12a62b/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 40ba82e..b6b4f21 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -24,6 +24,7 @@
 
 static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_link_check_credit_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 
@@ -184,6 +185,14 @@ void qdr_link_flow(qdr_core_t *core, qdr_link_t *link, int credit, bool
drain_mo
 }
 
 
+void qdr_link_check_credit(qdr_core_t *core, qdr_link_t *link)
+{
+    qdr_action_t *action = qdr_action(qdr_link_check_credit_CT, "link_check_credit");
+    action->args.connection.link = link;
+    qdr_action_enqueue(core, action);
+}
+
+
 void qdr_send_to1(qdr_core_t *core, qd_message_t *msg, qd_field_iterator_t *addr, bool exclude_inprocess,
bool control)
 {
     qdr_action_t *action = qdr_action(qdr_send_to_CT, "send_to");
@@ -430,6 +439,16 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action,
bool discar
 }
 
 
+static void qdr_link_check_credit_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+    if (discard)
+        return;
+
+    qdr_link_t *link = action->args.connection.link;
+    qdr_link_issue_credit_CT(core, link, 0, false);
+}
+
+
 static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, qdr_address_t
*addr)
 {
     int fanout = 0;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/4c12a62b/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index fbd5672..026e076 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -1076,6 +1076,11 @@ class RouterTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_20_batched_settlement(self):
+        test = BatchedSettlementTest(self.address)
+        test.run()
+        self.assertEqual(None, test.error)
+
     def test_connection_properties(self):
         connection = BlockingConnection(self.router.addresses[0],
                                         timeout=60,
@@ -1388,5 +1393,60 @@ class AppearanceOfBalanceTest(MessagingHandler):
         Container(self).run()
 
 
+class BatchedSettlementTest(MessagingHandler):
+    def __init__(self, address):
+        super(BatchedSettlementTest, self).__init__(auto_accept=False)
+        self.address = address
+        self.dest = "balanced.BatchedSettlement"
+        self.error = None
+        self.count       = 20000
+        self.batch_count = 200
+        self.n_sent      = 0
+        self.n_received  = 0
+        self.n_settled   = 0
+        self.batch       = []
+
+    def check_if_done(self):
+        if self.n_settled == self.count:
+            self.timer.cancel()
+            self.conn.close()
+
+    def timeout(self):
+        self.error = "Timeout Expired: sent=%d rcvd=%d settled=%d" % \
+                     (self.n_sent, self.n_received, self.n_settled)
+        self.conn.close()
+
+    def on_start(self, event):
+        self.timer    = event.reactor.schedule(20, Timeout(self))
+        self.conn     = event.container.connect(self.address)
+        self.sender   = event.container.create_sender(self.conn, self.dest)
+        self.receiver = event.container.create_receiver(self.conn, self.dest)
+
+    def send(self):
+        if self.n_sent < self.count:
+            while self.sender.credit > 0:
+                msg = Message(body="Batch-Test")
+                self.sender.send(msg)
+                self.n_sent += 1
+
+    def on_sendable(self, event):
+        if self.n_sent < self.count:
+            self.send()
+
+    def on_message(self, event):
+        self.n_received += 1
+        self.batch.insert(0, event.delivery)
+        if len(self.batch) == self.batch_count:
+            while len(self.batch) > 0:
+                self.accept(self.batch.pop())
+
+    def on_accepted(self, event):
+        self.n_settled += 1
+        self.check_if_done()
+
+    def run(self):
+        Container(self).run()
+
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message