From commits-return-46413-archive-asf-public=cust-asf.ponee.io@qpid.apache.org Mon Aug 6 17:42:52 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 0FCC3180627 for ; Mon, 6 Aug 2018 17:42:51 +0200 (CEST) Received: (qmail 63380 invoked by uid 500); 6 Aug 2018 15:42:51 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 63371 invoked by uid 99); 6 Aug 2018 15:42:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Aug 2018 15:42:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0F54FDFC43; Mon, 6 Aug 2018 15:42:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gmurthy@apache.org To: commits@qpid.apache.org Message-Id: <4a934142adcc4139a5416d62a0b0846c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: qpid-dispatch git commit: DISPATCH-1085 - Modified AMQP_link_detach_handler to flush out the remaining bytes in the message buffers before responding to detaches. This closes #345 Date: Mon, 6 Aug 2018 15:42:51 +0000 (UTC) Repository: qpid-dispatch Updated Branches: refs/heads/master a53179572 -> 22ef3d167 DISPATCH-1085 - Modified AMQP_link_detach_handler to flush out the remaining bytes in the message buffers before responding to detaches. This closes #345 Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/22ef3d16 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/22ef3d16 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/22ef3d16 Branch: refs/heads/master Commit: 22ef3d167b47fe243344908ef48fe44910e476ee Parents: a531795 Author: Ganesh Murthy Authored: Mon Jul 23 15:21:44 2018 -0400 Committer: Ganesh Murthy Committed: Mon Aug 6 11:40:56 2018 -0400 ---------------------------------------------------------------------- src/message.c | 14 ++++---- src/router_node.c | 19 +++++++++++ tests/system_tests_one_router.py | 62 +++++++++++++++++++++++++++++++++++ 3 files changed, 89 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/22ef3d16/src/message.c ---------------------------------------------------------------------- diff --git a/src/message.c b/src/message.c index 9c71b79..fce3394 100644 --- a/src/message.c +++ b/src/message.c @@ -1128,13 +1128,15 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) return discard_receive(delivery, link, (qd_message_t *)msg); } + // if q2 holdoff has been disabled (disable_q2_holdoff=true), we keep receiving. + // if q2 holdoff has been enabled (disable_q2_holdoff=false), if input is in holdoff then just exit. + // When enough buffers + // have been processed and freed by outbound processing then + // message holdoff is cleared and receiving may continue. // - // If input is in holdoff then just exit. When enough buffers - // have been processed and freed by outbound processing then - // message holdoff is cleared and receiving may continue. - // - if (msg->content->q2_input_holdoff) { - return (qd_message_t*)msg; + if (!msg->content->disable_q2_holdoff) { + if (msg->content->q2_input_holdoff) + return (qd_message_t*)msg; } // Loop until msg is complete, error seen, or incoming bytes are consumed http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/22ef3d16/src/router_node.c ---------------------------------------------------------------------- diff --git a/src/router_node.c b/src/router_node.c index 585dd6f..dbc7b59 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -752,6 +752,25 @@ static int AMQP_link_flow_handler(void* context, qd_link_t *link) */ static int AMQP_link_detach_handler(void* context, qd_link_t *link, qd_detach_type_t dt) { + if (!link) + return 0; + + pn_link_t *pn_link = qd_link_pn(link); + + if (!pn_link) + return 0; + + pn_delivery_t *pnd = pn_link_current(pn_link); + + if (pnd) { + qd_message_t *msg = qd_message_receive(pnd); + + if (!qd_message_receive_complete(msg)) { + qd_message_Q2_holdoff_disable(msg); + deferred_AMQP_rx_handler((void *)link, false); + } + } + qd_router_t *router = (qd_router_t*) context; qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link); pn_condition_t *cond = qd_link_pn(link) ? pn_link_remote_condition(qd_link_pn(link)) : 0; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/22ef3d16/tests/system_tests_one_router.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py index afadf81..b40a95b 100644 --- a/tests/system_tests_one_router.py +++ b/tests/system_tests_one_router.py @@ -417,6 +417,11 @@ class OneRouterTest(TestCase): test.run() self.assertEqual(None, test.error) + def test_41_large_streaming_close_conn_test(self): + test = LargeMessageStreamCloseConnTest(self.address) + test.run() + self.assertEqual(None, test.error) + class Entity(object): def __init__(self, status_code, status_description, attrs): @@ -2232,6 +2237,63 @@ class MulticastUnsettledTest(MessagingHandler): Container(self).run() +class LargeMessageStreamCloseConnTest(MessagingHandler): + def __init__(self, address): + super(LargeMessageStreamCloseConnTest, self).__init__() + self.address = address + self.dest = "LargeMessageStreamCloseConnTest" + self.error = None + self.timer = None + self.sender_conn = None + self.receiver_conn = None + self.sender = None + self.receiver = None + self.body = "" + self.aborted = False + for i in range(20000): + self.body += "0123456789101112131415" + + def timeout(self): + if self.aborted: + self.error = "Message has been aborted. Test failed" + else: + self.error = "Message not received. test failed" + self.receiver_conn.close() + + def on_start(self, event): + self.timer = event.reactor.schedule(TIMEOUT, Timeout(self)) + self.sender_conn = event.container.connect(self.address) + self.receiver_conn = event.container.connect(self.address) + self.sender = event.container.create_sender(self.sender_conn, self.dest) + self.receiver = event.container.create_receiver(self.receiver_conn, + self.dest, name="A") + + def on_sendable(self, event): + msg = Message(body=self.body) + # send(msg) calls the stream function which streams data + # from sender to the router + event.sender.send(msg) + + # Close the connection immediately after sending the message + # Without the fix for DISPATCH-1085, this test will fail + # one in five times with an abort + # With the fix in place, this test will never fail (the + # on_aborted will never be called). + self.sender_conn.close() + + def on_message(self, event): + self.timer.cancel() + self.receiver_conn.close() + + def on_aborted(self, event): + self.aborted = True + self.timer.cancel() + self.timeout() + + def run(self): + Container(self).run() + + class LargeMessageStreamTest(MessagingHandler): def __init__(self, address): super(LargeMessageStreamTest, self).__init__() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org