Return-Path: Delivered-To: apmail-incubator-qpid-commits-archive@locus.apache.org Received: (qmail 29226 invoked from network); 24 Oct 2008 16:20:38 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 24 Oct 2008 16:20:38 -0000 Received: (qmail 51766 invoked by uid 500); 24 Oct 2008 16:20:41 -0000 Delivered-To: apmail-incubator-qpid-commits-archive@incubator.apache.org Received: (qmail 51750 invoked by uid 500); 24 Oct 2008 16:20:41 -0000 Mailing-List: contact qpid-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: qpid-dev@incubator.apache.org Delivered-To: mailing list qpid-commits@incubator.apache.org Received: (qmail 51741 invoked by uid 99); 24 Oct 2008 16:20:41 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Oct 2008 09:20:41 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Oct 2008 16:19:38 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id D53C7238885D; Fri, 24 Oct 2008 09:20:17 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r707672 - in /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server: AMQChannel.java queue/SimpleAMQQueue.java Date: Fri, 24 Oct 2008 16:20:17 -0000 To: qpid-commits@incubator.apache.org From: aidan@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081024162017.D53C7238885D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: aidan Date: Fri Oct 24 09:20:16 2008 New Revision: 707672 URL: http://svn.apache.org/viewvc?rev=707672&view=rev Log: QPID-1315: Fix style issue, iterator control usage as per review comments from rgodfrey. Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=707672&r1=707671&r2=707672&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original) +++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Fri Oct 24 09:20:16 2008 @@ -333,7 +333,8 @@ Subscription sub = _tag2SubscriptionMap.remove(consumerTag); if (sub != null) { - try { + try + { sub.getSendLock(); sub.getQueue().unregisterSubscription(sub); } Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=707672&r1=707671&r2=707672&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original) +++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Fri Oct 24 09:20:16 2008 @@ -1204,16 +1204,16 @@ flushSubscription(sub, Long.MAX_VALUE); } - public boolean flushSubscription(Subscription sub, Long deliveries) throws AMQException + public boolean flushSubscription(Subscription sub, Long iterations) throws AMQException { boolean atTail = false; - while (!sub.isSuspended() && !atTail && deliveries != 0) + while (!sub.isSuspended() && !atTail && iterations != 0) { try { sub.getSendLock(); - atTail = attemptDelivery(sub, deliveries); + atTail = attemptDelivery(sub); if (atTail && sub.isAutoClose()) { unregisterSubscription(sub); @@ -1221,6 +1221,10 @@ ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter(); converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag()); } + else if (!atTail) + { + iterations--; + } } finally { @@ -1239,7 +1243,7 @@ return atTail; } - private boolean attemptDelivery(Subscription sub, Long deliveries) throws AMQException + private boolean attemptDelivery(Subscription sub) throws AMQException { boolean atTail = false; boolean advanced = false; @@ -1258,11 +1262,9 @@ if (!sub.isBrowser() && !node.acquire(sub)) { sub.restoreCredit(node); - } else { - deliveries--; deliverMessage(sub, node); if (sub.isBrowser()) @@ -1352,11 +1354,11 @@ boolean deliveryIncomplete = true; int extraLoops = 1; - Long deliveries = new Long(MAX_ASYNC_DELIVERIES); + Long iterations = new Long(MAX_ASYNC_DELIVERIES); _asynchronousRunner.compareAndSet(runner, null); - while (deliveries != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner)) + while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner)) { // we want to have one extra loop after every subscription has reached the point where it cannot move // further, just in case the advance of one subscription in the last loop allows a different subscription to @@ -1386,7 +1388,7 @@ QueueEntry node = moveSubscriptionToNextNode(sub); if (node != null) { - done = attemptDelivery(sub, deliveries); + done = attemptDelivery(sub); } } if (done) @@ -1409,6 +1411,7 @@ } else { + iterations--; extraLoops = 1; } } @@ -1422,7 +1425,7 @@ // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit // therefore we should schedule this runner again (unless someone beats us to it :-) ). - if (deliveries == 0 && _asynchronousRunner.compareAndSet(null, runner)) + if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner)) { _asyncDelivery.execute(runner); }