From qpid-commits-return-3656-apmail-incubator-qpid-commits-archive=incubator.apache.org@incubator.apache.org Tue Oct 02 11:09:21 2007 Return-Path: Delivered-To: apmail-incubator-qpid-commits-archive@locus.apache.org Received: (qmail 8429 invoked from network); 2 Oct 2007 11:09:21 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 2 Oct 2007 11:09:21 -0000 Received: (qmail 91364 invoked by uid 500); 2 Oct 2007 11:09:10 -0000 Delivered-To: apmail-incubator-qpid-commits-archive@incubator.apache.org Received: (qmail 91354 invoked by uid 500); 2 Oct 2007 11:09:10 -0000 Mailing-List: contact qpid-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: qpid-dev@incubator.apache.org Delivered-To: mailing list qpid-commits@incubator.apache.org Received: (qmail 91345 invoked by uid 99); 2 Oct 2007 11:09:10 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Oct 2007 04:09:10 -0700 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Oct 2007 11:09:20 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 4F11F1A9832; Tue, 2 Oct 2007 04:08:30 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r581189 - /incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Date: Tue, 02 Oct 2007 11:08:30 -0000 To: qpid-commits@incubator.apache.org From: rgodfrey@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20071002110830.4F11F1A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rgodfrey Date: Tue Oct 2 04:08:29 2007 New Revision: 581189 URL: http://svn.apache.org/viewvc?rev=581189&view=rev Log: QPID-614 : Applied patch supplied by Aidan Skinner Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?rev=581189&r1=581188&r2=581189&view=diff ============================================================================== --- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java (original) +++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Tue Oct 2 04:08:29 2007 @@ -20,6 +20,10 @@ */ package org.apache.qpid.client.protocol; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.failover.FailoverException; @@ -75,7 +79,12 @@ private volatile boolean _ready = false; /** Used to protect the shared event and ready flag between the producer and consumer. */ - private final Object _lock = new Object(); + private final ReentrantLock _lock = new ReentrantLock(); + + /** + * Used to signal that a method has been received + */ + private final Condition _receivedCondition = _lock.newCondition(); /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */ private volatile Exception _error; @@ -126,11 +135,16 @@ // we only update the flag from inside the synchronized block // so that the blockForFrame method cannot "miss" an update - it // will only ever read the flag from within the synchronized block - synchronized (_lock) + _lock.lock(); + try { _doneEvt = evt; _ready = ready; - _lock.notify(); + _receivedCondition.signal(); + } + finally + { + _lock.unlock(); } } @@ -159,38 +173,42 @@ */ public AMQMethodEvent blockForFrame(long timeout) throws AMQException, FailoverException { - synchronized (_lock) + long nanoTimeout = TimeUnit.MILLISECONDS.toNanos(timeout); + + _lock.lock(); + try { while (!_ready) { - try + if (timeout == -1) { - if (timeout == -1) - { - _lock.wait(); - } - else - { - - _lock.wait(timeout); - if (!_ready) - { - _error = new AMQTimeoutException("Server did not respond in a timely fashion"); - _ready = true; - } - } + _receivedCondition.await(); } - catch (InterruptedException e) + else { - // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess - // if (!_ready && timeout != -1) - // { - // _error = new AMQException("Server did not respond timely"); - // _ready = true; - // } + nanoTimeout = _receivedCondition.awaitNanos(nanoTimeout); + + if (nanoTimeout <= 0 && !_ready && _error == null) + { + _error = new AMQTimeoutException("Server did not respond in a timely fashion"); + _ready = true; + } } } } + catch (InterruptedException e) + { + // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess + // if (!_ready && timeout != -1) + // { + // _error = new AMQException("Server did not respond timely"); + // _ready = true; + // } + } + finally + { + _lock.unlock(); + } if (_error != null) { @@ -224,10 +242,15 @@ // can pick up the exception and rethrow to the caller _error = e; - synchronized (_lock) + _lock.lock(); + try { _ready = true; - _lock.notify(); + _receivedCondition.signal(); + } + finally + { + _lock.unlock(); } } }