qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r581189 - /incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
Date Tue, 02 Oct 2007 11:08:30 GMT
Author: rgodfrey
Date: Tue Oct  2 04:08:29 2007
New Revision: 581189

URL: http://svn.apache.org/viewvc?rev=581189&view=rev
Log:
QPID-614 : Applied patch supplied by Aidan Skinner

Modified:
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?rev=581189&r1=581188&r2=581189&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
(original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
Tue Oct  2 04:08:29 2007
@@ -20,6 +20,10 @@
  */
 package org.apache.qpid.client.protocol;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQTimeoutException;
 import org.apache.qpid.client.failover.FailoverException;
@@ -75,7 +79,12 @@
     private volatile boolean _ready = false;
 
     /** Used to protect the shared event and ready flag between the producer and consumer.
*/
-    private final Object _lock = new Object();
+    private final ReentrantLock _lock = new ReentrantLock();
+        
+    /**
+     * Used to signal that a method has been received
+     */
+    private final Condition _receivedCondition = _lock.newCondition();
 
     /** Used to hold the most recent exception that is passed to the {@link #error(Exception)}
method. */
     private volatile Exception _error;
@@ -126,11 +135,16 @@
             // we only update the flag from inside the synchronized block
             // so that the blockForFrame method cannot "miss" an update - it
             // will only ever read the flag from within the synchronized block
-            synchronized (_lock)
+            _lock.lock();
+            try
             {
                 _doneEvt = evt;
                 _ready = ready;
-                _lock.notify();
+                _receivedCondition.signal();
+            }
+            finally 
+            {
+                _lock.unlock();
             }
         }
 
@@ -159,38 +173,42 @@
      */
     public AMQMethodEvent blockForFrame(long timeout) throws AMQException, FailoverException
     {
-        synchronized (_lock)
+        long nanoTimeout = TimeUnit.MILLISECONDS.toNanos(timeout);
+        
+        _lock.lock();
+        try
         {
             while (!_ready)
             {
-                try
+                if (timeout == -1)
                 {
-                    if (timeout == -1)
-                    {
-                        _lock.wait();
-                    }
-                    else
-                    {
-
-                        _lock.wait(timeout);
-                        if (!_ready)
-                        {
-                            _error = new AMQTimeoutException("Server did not respond in a
timely fashion");
-                            _ready = true;
-                        }
-                    }
+                    _receivedCondition.await();
                 }
-                catch (InterruptedException e)
+                else
                 {
-                    // IGNORE    -- //fixme this isn't ideal as being interrupted isn't equivellant
to sucess
-                    // if (!_ready && timeout != -1)
-                    // {
-                    // _error = new AMQException("Server did not respond timely");
-                    // _ready = true;
-                    // }
+                    nanoTimeout = _receivedCondition.awaitNanos(nanoTimeout);
+
+                    if (nanoTimeout <= 0 && !_ready && _error == null)
+                    {
+                        _error = new AMQTimeoutException("Server did not respond in a timely
fashion");
+                        _ready = true;
+                    }
                 }
             }
         }
+        catch (InterruptedException e)
+        {
+            // IGNORE    -- //fixme this isn't ideal as being interrupted isn't equivellant
to sucess
+            // if (!_ready && timeout != -1)
+            // {
+            // _error = new AMQException("Server did not respond timely");
+            // _ready = true;
+            // }
+        }
+        finally
+        {
+            _lock.unlock();
+        }
 
         if (_error != null)
         {
@@ -224,10 +242,15 @@
         // can pick up the exception and rethrow to the caller
         _error = e;
 
-        synchronized (_lock)
+        _lock.lock();
+        try
         {
             _ready = true;
-            _lock.notify();
+            _receivedCondition.signal();
+        }
+        finally
+        {
+            _lock.unlock();
         }
     }
 }



Mime
View raw message