Author: rgodfrey Date: Tue Oct 2 04:08:29 2007 New Revision: 581189 URL: http://svn.apache.org/viewvc?rev=581189&view=rev Log: QPID-614 : Applied patch supplied by Aidan Skinner Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?rev=581189&r1=581188&r2=581189&view=diff ============================================================================== --- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java (original) +++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Tue Oct 2 04:08:29 2007 @@ -20,6 +20,10 @@ */ package org.apache.qpid.client.protocol; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.failover.FailoverException; @@ -75,7 +79,12 @@ private volatile boolean _ready = false; /** Used to protect the shared event and ready flag between the producer and consumer. */ - private final Object _lock = new Object(); + private final ReentrantLock _lock = new ReentrantLock(); + + /** + * Used to signal that a method has been received + */ + private final Condition _receivedCondition = _lock.newCondition(); /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */ private volatile Exception _error; @@ -126,11 +135,16 @@ // we only update the flag from inside the synchronized block // so that the blockForFrame method cannot "miss" an update - it // will only ever read the flag from within the synchronized block - synchronized (_lock) + _lock.lock(); + try { _doneEvt = evt; _ready = ready; - _lock.notify(); + _receivedCondition.signal(); + } + finally + { + _lock.unlock(); } } @@ -159,38 +173,42 @@ */ public AMQMethodEvent blockForFrame(long timeout) throws AMQException, FailoverException { - synchronized (_lock) + long nanoTimeout = TimeUnit.MILLISECONDS.toNanos(timeout); + + _lock.lock(); + try { while (!_ready) { - try + if (timeout == -1) { - if (timeout == -1) - { - _lock.wait(); - } - else - { - - _lock.wait(timeout); - if (!_ready) - { - _error = new AMQTimeoutException("Server did not respond in a timely fashion"); - _ready = true; - } - } + _receivedCondition.await(); } - catch (InterruptedException e) + else { - // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess - // if (!_ready && timeout != -1) - // { - // _error = new AMQException("Server did not respond timely"); - // _ready = true; - // } + nanoTimeout = _receivedCondition.awaitNanos(nanoTimeout); + + if (nanoTimeout <= 0 && !_ready && _error == null) + { + _error = new AMQTimeoutException("Server did not respond in a timely fashion"); + _ready = true; + } } } } + catch (InterruptedException e) + { + // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess + // if (!_ready && timeout != -1) + // { + // _error = new AMQException("Server did not respond timely"); + // _ready = true; + // } + } + finally + { + _lock.unlock(); + } if (_error != null) { @@ -224,10 +242,15 @@ // can pick up the exception and rethrow to the caller _error = e; - synchronized (_lock) + _lock.lock(); + try { _ready = true; - _lock.notify(); + _receivedCondition.signal(); + } + finally + { + _lock.unlock(); } } }