Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 47F92D8EC for ; Tue, 4 Dec 2012 14:03:08 +0000 (UTC) Received: (qmail 92978 invoked by uid 500); 4 Dec 2012 14:03:08 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 92847 invoked by uid 500); 4 Dec 2012 14:03:04 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 92820 invoked by uid 99); 4 Dec 2012 14:03:03 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Dec 2012 14:03:03 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Dec 2012 14:03:02 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 38DC423888CD; Tue, 4 Dec 2012 14:02:42 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1416944 - in /qpid/proton/trunk: proton-c/src/engine/engine.c tests/proton_tests/engine.py Date: Tue, 04 Dec 2012 14:02:41 -0000 To: commits@qpid.apache.org From: gsim@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121204140242.38DC423888CD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gsim Date: Tue Dec 4 14:02:39 2012 New Revision: 1416944 URL: http://svn.apache.org/viewvc?rev=1416944&view=rev Log: PROTON-157: Ensure deliveries on tpwork queue are treated in correct order Modified: qpid/proton/trunk/proton-c/src/engine/engine.c qpid/proton/trunk/tests/proton_tests/engine.py Modified: qpid/proton/trunk/proton-c/src/engine/engine.c URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1416944&r1=1416943&r2=1416944&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/src/engine/engine.c (original) +++ qpid/proton/trunk/proton-c/src/engine/engine.c Tue Dec 4 14:02:39 2012 @@ -2119,16 +2119,18 @@ int pn_post_disp(pn_transport_t *transpo return 0; } -int pn_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *delivery) +int pn_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *delivery, bool* allocation_blocked) { pn_link_t *link = delivery->link; pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session); pn_link_state_t *link_state = pn_link_get_state(ssn_state, link); if ((int16_t) ssn_state->local_channel >= 0 && (int32_t) link_state->local_handle >= 0) { pn_delivery_state_t *state = (pn_delivery_state_t *) delivery->transport_context; - if (!state && pn_delivery_buffer_available(&ssn_state->outgoing)) { + if (!(*allocation_blocked) && !state && pn_delivery_buffer_available(&ssn_state->outgoing)) { state = pn_delivery_buffer_push(&ssn_state->outgoing, delivery); delivery->transport_context = state; + } else { + *allocation_blocked = true; } if (state && !state->sent && (delivery->done || pn_buffer_size(delivery->bytes) > 0) && @@ -2200,6 +2202,7 @@ int pn_process_tpwork(pn_transport_t *tr { pn_connection_t *conn = (pn_connection_t *) endpoint; pn_delivery_t *delivery = conn->tpwork_head; + bool allocation_blocked = false; while (delivery) { if (!delivery->transport_context && transport->disp->available > 0) { @@ -2208,7 +2211,7 @@ int pn_process_tpwork(pn_transport_t *tr pn_link_t *link = delivery->link; if (pn_link_is_sender(link)) { - int err = pn_process_tpwork_sender(transport, delivery); + int err = pn_process_tpwork_sender(transport, delivery, &allocation_blocked); if (err) return err; } else { int err = pn_process_tpwork_receiver(transport, delivery); Modified: qpid/proton/trunk/tests/proton_tests/engine.py URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/engine.py?rev=1416944&r1=1416943&r2=1416944&view=diff ============================================================================== --- qpid/proton/trunk/tests/proton_tests/engine.py (original) +++ qpid/proton/trunk/tests/proton_tests/engine.py Tue Dec 4 14:02:39 2012 @@ -29,10 +29,10 @@ from proton import * OUTPUT_SIZE = 10*1024 -def pump(t1, t2): +def pump(t1, t2, buffer_size=OUTPUT_SIZE): while True: - out1 = t1.output(OUTPUT_SIZE) - out2 = t2.output(OUTPUT_SIZE) + out1 = t1.output(buffer_size) + out2 = t2.output(buffer_size) if out1 or out2: if out1: @@ -98,9 +98,9 @@ class Test(common.Test): def cleanup(self): pass - def pump(self): + def pump(self, buffer_size=OUTPUT_SIZE): for c1, t1, c2, t2 in self._wires: - pump(t1, t2) + pump(t1, t2, buffer_size) class ConnectionTest(Test): @@ -575,6 +575,71 @@ class TransferTest(Test): assert sd.local_state == rd.remote_state == Delivery.ACCEPTED + def test_delivery_id_ordering(self): + self.rcv.flow(1024) + self.pump(buffer_size=64*1024) + + #fill up delivery buffer on sender + for m in range(1024): + sd = self.snd.delivery("tag%s" % m) + msg = "message %s" % m + n = self.snd.send(msg) + assert n == len(msg) + assert self.snd.advance() + + self.pump(buffer_size=64*1024) + + #receive a session-windows worth of messages and accept them + for m in range(1024): + rd = self.rcv.current + assert rd is not None, m + assert rd.tag == ("tag%s" % m), (rd.tag, m) + msg = self.rcv.recv(1024) + assert msg == ("message %s" % m), (msg, m) + rd.update(Delivery.ACCEPTED) + rd.settle() + + self.pump(buffer_size=64*1024) + + #add some new deliveries + for m in range(1024, 1450): + sd = self.snd.delivery("tag%s" % m) + msg = "message %s" % m + n = self.snd.send(msg) + assert n == len(msg) + assert self.snd.advance() + + #handle all disposition changes to sent messages + d = self.c1.work_head + while d: + if d.updated: + d.update(Delivery.ACCEPTED) + d.settle() + d = d.work_next + + #submit some more deliveries + for m in range(1450, 1500): + sd = self.snd.delivery("tag%s" % m) + msg = "message %s" % m + n = self.snd.send(msg) + assert n == len(msg) + assert self.snd.advance() + + self.pump(buffer_size=64*1024) + self.rcv.flow(1024) + self.pump(buffer_size=64*1024) + + #verify remaining messages can be received and accepted + for m in range(1024, 1500): + rd = self.rcv.current + assert rd is not None, m + assert rd.tag == ("tag%s" % m), (rd.tag, m) + msg = self.rcv.recv(1024) + assert msg == ("message %s" % m), (msg, m) + rd.update(Delivery.ACCEPTED) + rd.settle() + + class MaxFrameTransferTest(Test): def setup(self): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org