activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1395411 - /activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp
Date Sun, 07 Oct 2012 22:46:05 GMT
Author: tabish
Date: Sun Oct  7 22:46:05 2012
New Revision: 1395411

URL: http://svn.apache.org/viewvc?rev=1395411&view=rev
Log:
refactoring to attempt a fix for: https://issues.apache.org/jira/browse/AMQCPP-405

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp?rev=1395411&r1=1395410&r2=1395411&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp
Sun Oct  7 22:46:05 2012
@@ -29,6 +29,8 @@
 #include <decaf/internal/util/concurrent/Atomics.h>
 #include <decaf/internal/util/concurrent/PlatformThread.h>
 
+#include <deque>
+
 using namespace decaf;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
@@ -187,6 +189,12 @@ namespace {
          */
         Node* nextWaiter;
 
+        /**
+         * Linked nodes in the free pool using this special next trail to avoid
+         * live nodes stepping into the pool and getting stuck.
+         */
+        Node* nextFree;
+
     private:
 
         Node(const Node&);
@@ -194,11 +202,13 @@ namespace {
 
     public:
 
-        Node() : waitStatus(0), prev(NULL), next(NULL), thread(NULL), nextWaiter(NULL) {
+        Node() : waitStatus(0), prev(NULL), next(NULL), thread(NULL), nextWaiter(NULL), nextFree(NULL)
{
+        }
+        Node(Thread* thread, Node* node) : waitStatus(0), prev(NULL), next(NULL), thread(thread),
nextWaiter(node), nextFree(NULL) {
         }
-        Node(Thread* thread, Node* node) : waitStatus(0), prev(NULL), next(NULL), thread(thread),
nextWaiter(node) {
+        Node(Thread* thread, int waitStatus) : waitStatus(waitStatus), prev(NULL), next(NULL),
thread(thread), nextWaiter(NULL), nextFree(NULL) {
         }
-        Node(Thread* thread, int waitStatus) : waitStatus(waitStatus), prev(NULL), next(NULL),
thread(thread), nextWaiter(NULL) {
+        Node(Thread* thread, int waitStatus, Node* node) : waitStatus(waitStatus), prev(NULL),
next(NULL), thread(thread), nextWaiter(node), nextFree(NULL) {
         }
 
         ~Node() {}
@@ -223,6 +233,122 @@ namespace {
     // For very short timeouts its usually more efficient to just spin instead of
     // parking a thread.
     const long long spinForTimeoutLimit = 1000LL;
+
+    /**
+     * When a thread no longer needs its Node in the AQS it is moved to the NodePool.
+     *
+     * The Pool will create Node's on demand or access them from its internal pool of
+     * old Nodes.  The Pool can at times shrink the list of old nodes if it needs to.
+     */
+    class NodePool {
+    private:
+
+        Node head;
+        Node* tail;
+        decaf_mutex_t lock;
+
+    public:
+
+        NodePool() : head(), tail(NULL), lock() {
+            PlatformThread::createMutex(&lock);
+
+            for (int i = 0; i < 100; ++i) {
+                Node* node = new Node();
+                Node* t = tail;
+
+                if (t != NULL) {
+                    t->nextFree = node;
+                    tail = node;
+                } else {
+                    tail = node;
+                    head.nextFree = tail;
+                }
+            }
+        }
+
+        ~NodePool() {
+
+            PlatformThread::lockMutex(lock);
+
+            while (head.nextFree != NULL) {
+                Node* node = head.nextFree;
+                head.nextFree = node->nextFree;
+                delete node;
+            }
+
+            PlatformThread::unlockMutex(lock);
+
+            PlatformThread::destroyMutex(lock);
+        }
+
+        Node* takeNode() {
+            return takeNode(NULL, 0, NULL);
+        }
+
+        Node* takeNode(Thread* thread, int waitStatus) {
+            return takeNode(thread, waitStatus, NULL);
+        }
+
+        Node* takeNode(Thread* thread, Node* nextWaiter) {
+            return takeNode(thread, 0, nextWaiter);
+        }
+
+        Node* takeNode(Thread* thread, int waitStatus, Node* nextWaiter) {
+
+            Node* result = NULL;
+
+            if (head.nextFree != NULL) {
+                PlatformThread::lockMutex(lock);
+
+                if (head.nextFree != NULL) {
+                    result = head.nextFree;
+                    head.nextFree = result->nextFree;
+
+                    if (result == tail) {
+                        tail = NULL;
+                    }
+                }
+
+                PlatformThread::unlockMutex(lock);
+            }
+
+            if (result == NULL) {
+                result = new Node(thread, waitStatus, nextWaiter);
+            } else {
+                // Reset to the new state.
+                result->thread = thread;
+                result->waitStatus = waitStatus;
+                result->nextWaiter = nextWaiter;
+                result->prev = NULL;
+                result->next = NULL;
+                result->nextFree = NULL;
+            }
+
+            return result;
+        }
+
+        void returnNode(Node* node) {
+
+            if (node == NULL) {
+                return;
+            }
+
+            PlatformThread::lockMutex(lock);
+
+            Node* t = tail;
+
+            if (t != NULL) {
+                t->nextFree = node;
+                tail = node;
+                tail->nextFree = NULL;
+            } else {
+                tail = node;
+                head.nextFree = tail;
+            }
+
+            PlatformThread::unlockMutex(lock);
+        }
+    };
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -251,16 +377,6 @@ namespace locks {
         volatile int state;
 
         /**
-         * Platform level R/W lock. Because we don't implement a garbage collected Node
-         * scheme we can't just use atomic operations on the Node pointers so in cases where
-         * we operate on the list of Nodes to remove canceled items we must write lock the
-         * list.  Likewise in cases where we are iterating through the list to collect
-         * statistics we must ensure that a Node won't suddenly become invalid so we must
-         * hold a read lock.
-         */
-        decaf_rwmutex_t rwLock;
-
-        /**
          * Head of the wait queue, lazily initialized.  Except for initialization, it is
          * modified only via method setHead.  Note: If head exists, its waitStatus is
          * guaranteed not to be CANCELLED.
@@ -273,25 +389,22 @@ namespace locks {
          */
         AtomicReference<Node> tail;
 
+        /**
+         * Pool used to store discarded Nodes for reuse by another thread.
+         */
+        NodePool nodePool;
+
     public:
 
         SynchronizerState(AbstractQueuedSynchronizer* parent) :
-            parent(parent), state(0), rwLock(), head(), tail() {
-            PlatformThread::createRWMutex(&rwLock);
+            parent(parent), state(0), head(), tail() {
         }
 
         virtual ~SynchronizerState() {
 
-            // Ensure that the destructor waits for other operations to complete.
-            PlatformThread::writerLockMutex(rwLock);
-
             while (tail.get() != NULL) {
-                delete tail.getAndSet(tail.get()->prev);
+                nodePool.returnNode(tail.getAndSet(tail.get()->prev));
             }
-
-            PlatformThread::unlockRWMutex(rwLock);
-
-            PlatformThread::destroyRWMutex(rwLock);
         }
 
         bool isHeldExclusively() const {
@@ -310,24 +423,22 @@ namespace locks {
          */
         Node* enqueue(Node* node) {
 
-            Node* pred = NULL;
-
-            PlatformThread::writerLockMutex(rwLock);
-
-            pred = tail.get();
-            if (pred == NULL) { // Must initialize
-                pred = new Node();
-                head.set(pred);
-                tail.set(pred);
+            for (;;) {
+                Node* t = tail.get();
+                if (t == NULL) { // Must initialize
+                    if (compareAndSetHead(nodePool.takeNode())) {
+                        tail.set(head.get());
+                    }
+                } else {
+                    node->prev = t;
+                    if (compareAndSetTail(t, node)) {
+                        t->next = node;
+                        return t;
+                    }
+                }
             }
 
-            node->prev = pred;
-            pred->next = node;
-            tail.set(node);
-
-            PlatformThread::unlockRWMutex(rwLock);
-
-            return pred;
+            return NULL;
         }
 
         /**
@@ -341,27 +452,39 @@ namespace locks {
          * @return the newly added Node
          */
         Node* addWaiter(Node* mode) {
-            Node* node = new Node(Thread::currentThread(), mode);
+            Node* node = nodePool.takeNode(Thread::currentThread(), mode);
+
+            // Try the fast add method first, then fall-back to the slower one.
+            Node* pred = tail.get();
+            if (pred != NULL) {
+                node->prev = pred;
+                if (compareAndSetTail(pred, node)) {
+                    pred->next = node;
+                    return node;
+                }
+            }
+
             enqueue(node);
+
             return node;
         }
 
         /**
          * The only place head is altered, we NULL out prev since that Node will be
          * Destroyed or pooled after this, but leave next alone since it should still
-         * be valid.
+         * be valid.  The method calling this needs to be the lock holder which ensures
+         * that no other thread can alter head.
          *
          * @param node
          *      The Node that is to become the new Head of the queue.
          */
         Node* setHead(Node* node) {
-            Node* oldHead = NULL;
-            PlatformThread::writerLockMutex(rwLock);
-            oldHead = head.get();
+            Node* oldHead = head.get();
+
             head.set(node);
             node->thread = NULL;
             node->prev = NULL;
-            PlatformThread::unlockRWMutex(this->rwLock);
+
             return oldHead;
         }
 
@@ -374,34 +497,29 @@ namespace locks {
         void unparkSuccessor(Node* node) {
 
             // If status is negative (i.e., possibly needing signal) try to clear
-            // in anticipation of signalling.  It is OK if this fails or if status
+            // in anticipation of signaling.  It is OK if this fails or if status
             // is changed by waiting thread.
             int ws = node->waitStatus;
             if (ws < 0) {
                 compareAndSetWaitStatus(node, ws, 0);
             }
 
-            // We need to lock to prevent cancellation of a Node from altering
-            // the list as we iterate and check Node status fields.
-            PlatformThread::readerLockMutex(this->rwLock);
-
             // Thread to un-park is held in successor, which is normally just the
             // next node.  But if canceled or apparently NULL, traverse backwards
             // from tail to find the actual non-canceled successor.
             Node* successor = node->next;
             if (successor == NULL || successor->waitStatus > 0) {
                 successor = NULL;
-                for (Node* t = tail.get(); t != NULL && t != node; t = t->prev)
+                for (Node* t = tail.get(); t != NULL && t != node; t = t->prev)
{
                     if (t->waitStatus <= 0) {
                         successor = t;
                     }
+                }
             }
 
             if (successor != NULL) {
                 LockSupport::unpark((Thread*)successor->thread);
             }
-
-            PlatformThread::unlockRWMutex(this->rwLock);
         }
 
         /**
@@ -411,10 +529,6 @@ namespace locks {
          */
         void doReleaseShared() {
 
-            // Here we have to read lock because head could change when a
-            // different thread does its release shared.
-            PlatformThread::readerLockMutex(this->rwLock);
-
             // Ensure that a release propagates, even if there are other in-progress
             // acquires/releases.  This proceeds in the usual way of trying to
             // unparkSuccessor of head if it needs signal. But if it does not,
@@ -431,20 +545,15 @@ namespace locks {
                             continue;            // loop to recheck cases
                         }
 
-                        // Platform level lock may not be reentrant.
-                        PlatformThread::unlockRWMutex(this->rwLock);
                         unparkSuccessor(h);
-                        PlatformThread::readerLockMutex(this->rwLock);
                     } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node::PROPAGATE))
{
                         continue;                // loop on failed CAS
                     }
                 }
-                if (h == head.get()) {                  // loop if head changed
+                if (h == head.get()) {           // loop if head changed
                     break;
                 }
             }
-
-            PlatformThread::unlockRWMutex(this->rwLock);
         }
 
         /**
@@ -462,10 +571,6 @@ namespace locks {
 
             Node* head = setHead(node); // Record old head for check below
 
-            // Here we have to read lock because head could change when a
-            // different thread does its release shared.
-            PlatformThread::readerLockMutex(this->rwLock);
-
             // Try to signal next queued node if:
             //   Propagation was indicated by caller, or was recorded (as head->waitStatus)
             //   by a previous operation (note: this uses sign-check of waitStatus because
@@ -478,65 +583,67 @@ namespace locks {
             if (propagate > 0 || head == NULL || head->waitStatus < 0) {
                 Node* successor = node->next;
                 if (successor == NULL || successor->isShared()) {
-                    PlatformThread::unlockRWMutex(this->rwLock);
                     doReleaseShared();
-                } else {
-                    PlatformThread::unlockRWMutex(this->rwLock);
                 }
-            } else {
-                PlatformThread::unlockRWMutex(this->rwLock);
             }
 
             return head;
         }
 
         /**
-         * Cancels an ongoing attempt to acquire.  The cancelled Node needs to be
-         * deleted but we need to unlink it from the list before we can do so.
+         * Cancels an ongoing attempt to acquire.  A canceled node can be returned to
+         * the pool once its status has been updated and its links are updated.
          *
          * @param node
-         *      The node that was attempting to acquire, will be delted here.
+         *      The node that was attempting to acquire, will be returned to the pool.
          */
         void cancelAcquire(Node* node) {
 
-            // Ignore if node doesn't exist
             if (node == NULL) {
                 return;
             }
 
             node->thread = NULL;
 
-            // Can use unconditional write instead of CAS here.  After this atomic
-            // step, other Nodes can skip past us. Before, we are free of interference
-            // from other threads.
-            node->waitStatus = Node::CANCELLED;
-
-            // If successor needs signal, try to set pred's next-link so it will
-            // get one. Otherwise wake it up to propagate.
-            int ws = 0;
-
-            PlatformThread::writerLockMutex(this->rwLock);
-
-            if (node == tail.get()) {
-                tail.set(node->prev);
-                node->prev->next = NULL;
-            } else {
-                node->prev->next = node->next;
-                node->next->prev = node->prev;
+            // Skip canceled predecessors
+            Node* pred = node->prev;
+            while (pred->waitStatus > 0) {
+                node->prev = pred = pred->prev;
             }
 
-            if (node->prev != head.get() &&
-                ((ws = node->prev->waitStatus) == Node::SIGNAL ||
-                 (ws <= 0 && compareAndSetWaitStatus(node->prev, ws, Node::SIGNAL)))
&&
-                 node->prev->thread != NULL) {
+            // predNext is the apparent node to unsplice. CASes below will
+            // fail if not, in which case, we lost race vs another cancel
+            // or signal, so no further action is necessary.
+            Node* predNext = pred->next;
+
+            // Can use unconditional write instead of CAS here.
+            // After this atomic step, other Nodes can skip past us.
+            // Before, we are free of interference from other threads.
+            node->waitStatus = Node::CANCELLED;
 
-                PlatformThread::unlockRWMutex(this->rwLock);
+            // If we are the tail, remove ourselves.
+            if (node == tail.get() && compareAndSetTail(node, pred)) {
+                compareAndSetNext(pred, predNext, NULL);
             } else {
-                PlatformThread::unlockRWMutex(this->rwLock);
-                unparkSuccessor(node);
+                // If successor needs signal, try to set pred's next-link
+                // so it will get one. Otherwise wake it up to propagate.
+                int ws;
+                if (pred != head.get() &&
+                    ((ws = pred->waitStatus) == Node::SIGNAL ||
+                     (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node::SIGNAL)))
&&
+                    pred->thread != NULL) {
+
+                    Node* next = node->next;
+                    if (next != NULL && next->waitStatus <= 0) {
+                        compareAndSetNext(pred, predNext, next);
+                    }
+                } else {
+                    unparkSuccessor(node);
+                }
             }
 
-            delete node;
+            node->next = NULL;   // Help any lingering threads that land on this node
+            nodePool.returnNode(node);
         }
 
         /**
@@ -551,21 +658,22 @@ namespace locks {
          *
          * @return true if thread should block.
          */
-        bool shouldParkAfterFailedAcquire(Node* node) {
-
-            bool result = false;
-
-            PlatformThread::readerLockMutex(this->rwLock);
+        bool shouldParkAfterFailedAcquire(Node* pred, Node* node) {
 
-            int ws = node->prev->waitStatus;
+            int ws = pred->waitStatus;
 
-            if (ws == Node::SIGNAL)
+            if (ws == Node::SIGNAL) {
                 // This node has already set status asking a release to signal
                 // it, so it can safely park.
-                result = true;
+                return true;
+            }
+
             if (ws > 0) {
                  // Predecessor was canceled. Skip over predecessors and indicate retry.
-                result = false;
+                do {
+                    node->prev = pred = pred->prev;
+                } while (pred->waitStatus > 0);
+                pred->next = node;
             } else {
                 // waitStatus must be 0 or PROPAGATE.  Indicate that we need a
                 // signal, but don't park yet.  Caller will need to retry to
@@ -573,9 +681,7 @@ namespace locks {
                 compareAndSetWaitStatus(node->prev, ws, Node::SIGNAL);
             }
 
-            PlatformThread::unlockRWMutex(this->rwLock);
-
-            return result;
+            return false;
         }
 
         /**
@@ -598,7 +704,7 @@ namespace locks {
         /**
          * Acquires in exclusive uninterruptible mode for thread already in queue
          * Used by condition wait methods as well as acquire.  We can access the
-         * Node's data here as it can't be deleted on us because it is guaranteed
+         * Node's data here as it can't be altered on us because it is guaranteed
          * to exist until this method returns.
          *
          * @param node
@@ -609,33 +715,27 @@ namespace locks {
          * @return true if interrupted while waiting
          */
         bool acquireQueued(Node* node, int arg) {
-            bool failed = true;
             try {
-
                 bool interrupted = false;
                 for (;;) {
                     Node* p = node->predecessor();
                     if (p == head.get() && parent->tryAcquire(arg)) {
-                        delete setHead(node);
-                        failed = false;
+                        setHead(node);
+                        p->waitStatus = Node::CANCELLED;
+                        p->next = NULL;
+                        nodePool.returnNode(p);
                         return interrupted;
                     }
 
-                    if (shouldParkAfterFailedAcquire(node) && parkAndCheckInterrupt())
{
+                    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
{
                         interrupted = true;
                     }
                 }
 
-            } catch(Exception& ex) {
-                if (failed) {
-                    cancelAcquire(node);
-                }
-
-                throw ex;
+            } catch (Exception& ex) {
+                cancelAcquire(node);
+                throw;
             }
-
-            cancelAcquire(node);
-            return true;
         }
 
         /**
@@ -644,26 +744,24 @@ namespace locks {
          */
         void doAcquireInterruptibly(int arg) {
             Node* node = addWaiter(Node::EXCLUSIVE);
-            bool failed = true;
             try {
                 for (;;) {
                     Node* p = node->predecessor();
                     if (p == head.get() && parent->tryAcquire(arg)) {
-                        delete setHead(node);
-                        failed = false;
+                        setHead(node);
+                        p->waitStatus = Node::CANCELLED;
+                        p->next = NULL;
+                        nodePool.returnNode(p);
                         return;
                     }
 
-                    if (shouldParkAfterFailedAcquire(node) && parkAndCheckInterrupt())
{
-                        throw InterruptedException();
+                    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
{
+                        throw InterruptedException(__FILE__, __LINE__, "Interrupted while
waiting for lock.");
                     }
                 }
 
-            } catch(InterruptedException& ex) {
-                if (failed) {
-                    cancelAcquire(node);
-                }
-
+            } catch (Exception& ex) {
+                cancelAcquire(node);
                 throw InterruptedException(ex);
             }
         }
@@ -678,13 +776,14 @@ namespace locks {
         bool doAcquireNanos(int arg, long long nanosTimeout) {
             long long lastTime = System::nanoTime();
             Node* node = addWaiter(Node::EXCLUSIVE);
-            bool failed = true;
             try {
                 for (;;) {
                     Node* p = node->predecessor();
                     if (p == head.get() && parent->tryAcquire(arg)) {
-                        delete setHead(node);
-                        failed = false;
+                        setHead(node);
+                        p->waitStatus = Node::CANCELLED;
+                        p->next = NULL;
+                        nodePool.returnNode(p);
                         return true;
                     }
 
@@ -692,7 +791,7 @@ namespace locks {
                         break;
                     }
 
-                    if (shouldParkAfterFailedAcquire(node) && nanosTimeout > spinForTimeoutLimit)
{
+                    if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout >
spinForTimeoutLimit) {
                         LockSupport::parkNanos(nanosTimeout);
                     }
 
@@ -701,16 +800,13 @@ namespace locks {
                     lastTime = now;
 
                     if (Thread::interrupted()) {
-                        throw InterruptedException();
+                        throw InterruptedException(__FILE__, __LINE__, "Interrupted while
waiting for lock.");
                     }
                 }
 
-            } catch(InterruptedException& ex) {
-                if (failed) {
-                    cancelAcquire(node);
-                }
-
-                throw ex;
+            } catch (Exception& ex) {
+                cancelAcquire(node);
+                throw;
             }
 
             cancelAcquire(node);
@@ -723,7 +819,6 @@ namespace locks {
          */
         void doAcquireShared(int arg) {
             Node* node = addWaiter(&Node::SHARED);
-            bool failed = true;
             try {
                 bool interrupted = false;
                 for (;;) {
@@ -731,25 +826,25 @@ namespace locks {
                     if (p == head.get()) {
                         int r = parent->tryAcquireShared(arg);
                         if (r >= 0) {
-                            delete setHeadAndPropagate(node, r);
+                            setHeadAndPropagate(node, r);
+                            p->waitStatus = Node::CANCELLED;
+                            p->next = NULL;
+                            nodePool.returnNode(p);
+
                             if (interrupted) {
                                 selfInterrupt();
                             }
-                            failed = false;
                             return;
                         }
                     }
 
-                    if (shouldParkAfterFailedAcquire(node) && parkAndCheckInterrupt())
{
+                    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
{
                         interrupted = true;
                     }
                 }
-            } catch(Exception& ex) {
-                if (failed) {
-                    cancelAcquire(node);
-                }
-
-                throw ex;
+            } catch (Exception& ex) {
+                cancelAcquire(node);
+                throw;
             }
 
             cancelAcquire(node);
@@ -761,28 +856,27 @@ namespace locks {
          */
         void doAcquireSharedInterruptibly(int arg) {
             Node* node = addWaiter(&Node::SHARED);
-            bool failed = true;
             try {
                 for (;;) {
                     Node* p = node->predecessor();
                     if (p == head.get()) {
                         int r = parent->tryAcquireShared(arg);
                         if (r >= 0) {
-                            delete setHeadAndPropagate(node, r);
-                            failed = false;
+                            setHeadAndPropagate(node, r);
+                            p->waitStatus = Node::CANCELLED;
+                            p->next = NULL;
+                            nodePool.returnNode(p);
                             return;
                         }
                     }
-                    if (shouldParkAfterFailedAcquire(node) && parkAndCheckInterrupt())
{
-                        throw InterruptedException();
+
+                    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
{
+                        throw InterruptedException(__FILE__, __LINE__, "Interrupted while
waiting for lock.");
                     }
                 }
-            } catch(InterruptedException& ex) {
-                if (failed) {
-                    cancelAcquire(node);
-                }
-
-                throw ex;
+            } catch (Exception& ex) {
+                cancelAcquire(node);
+                throw;
             }
         }
 
@@ -800,15 +894,16 @@ namespace locks {
 
             long long lastTime = System::nanoTime();
             Node* node = addWaiter(&Node::SHARED);
-            bool failed = true;
             try {
                 for (;;) {
                     Node* p = node->predecessor();
                     if (p == head.get()) {
                         int r = parent->tryAcquireShared(arg);
                         if (r >= 0) {
-                            delete setHeadAndPropagate(node, r);
-                            failed = false;
+                            setHeadAndPropagate(node, r);
+                            p->waitStatus = Node::CANCELLED;
+                            p->next = NULL;
+                            nodePool.returnNode(p);
                             return true;
                         }
                     }
@@ -816,7 +911,7 @@ namespace locks {
                         break;
                     }
 
-                    if (shouldParkAfterFailedAcquire(node) && nanosTimeout > spinForTimeoutLimit)
{
+                    if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout >
spinForTimeoutLimit) {
                         LockSupport::parkNanos(nanosTimeout);
                     }
 
@@ -825,15 +920,12 @@ namespace locks {
                     lastTime = now;
 
                     if (Thread::interrupted()) {
-                        throw InterruptedException();
+                        throw InterruptedException(__FILE__, __LINE__, "Interrupted while
waiting for lock.");
                     }
                 }
-            } catch(InterruptedException& ex) {
-                if (failed) {
-                    cancelAcquire(node);
-                }
-
-                throw ex;
+            } catch (Exception& ex) {
+                cancelAcquire(node);
+                throw;
             }
 
             cancelAcquire(node);
@@ -847,8 +939,6 @@ namespace locks {
             // first node. If not, we continue on, safely traversing from tail
             // back to head to find first, guaranteeing termination.
 
-            PlatformThread::readerLockMutex(this->rwLock);
-
             Node* t = tail.get();
             Thread* firstThread = NULL;
             while (t != NULL && t != head.get()) {
@@ -859,8 +949,6 @@ namespace locks {
                 t = t->prev;
             }
 
-            PlatformThread::unlockRWMutex(this->rwLock);
-
             return firstThread;
         }
 
@@ -898,7 +986,6 @@ namespace locks {
         bool findNodeFromTail(Node* node) {
             bool found = false;
 
-            PlatformThread::readerLockMutex(this->rwLock);
             Node* t = tail.get();
             for (;;) {
 
@@ -914,7 +1001,6 @@ namespace locks {
                 t = t->prev;
             }
 
-            PlatformThread::unlockRWMutex(this->rwLock);
             return found;
         }
 
@@ -940,10 +1026,6 @@ namespace locks {
             // woken up either by cancel or by acquiring the lock.
             enqueue(node);
 
-            // Since we need to write to our predecessor we must lock so that
-            // it doesn't leave the Queue before we are done.
-            PlatformThread::readerLockMutex(this->rwLock);
-
             // Splice onto queue and try to set waitStatus of predecessor to
             // indicate that thread is (probably) waiting. If canceled or
             // attempt to set waitStatus fails, wake up to resync (in which
@@ -954,8 +1036,6 @@ namespace locks {
                 LockSupport::unpark((Thread*)node->thread);
             }
 
-            PlatformThread::unlockRWMutex(this->rwLock);
-
             return true;
         }
 
@@ -1001,23 +1081,16 @@ namespace locks {
          * @return previous sync state
          */
         int fullyRelease(Node* node) {
-            bool failed = true;
             try {
                 int savedState = parent->getState();
                 if (parent->release(savedState)) {
-                    failed = false;
                     return savedState;
                 } else {
                     throw IllegalMonitorStateException();
                 }
             } catch(IllegalMonitorStateException& ex) {
-                if (failed) {
-                    node->waitStatus = Node::CANCELLED;
-                    // Enqueue it even though canceled so that it gets deleted
-                    enqueue(node);
-                }
-
-                throw ex;
+                node->waitStatus = Node::CANCELLED;
+                throw;
             }
         }
 
@@ -1030,6 +1103,9 @@ namespace locks {
         static bool compareAndSetWaitStatus(Node* node, int expect, int update) {
             return Atomics::compareAndSet32(&node->waitStatus, expect, update);
         }
+        static bool compareAndSetNext(Node* node, Node* expect, Node* update) {
+            return Atomics::compareAndSet((volatile void **)(&node->next), (void*)expect,
(void*)update);
+        }
     };
 
     /**
@@ -1065,6 +1141,10 @@ namespace locks {
         DefaultConditionObject(SynchronizerState* impl) :
             ConditionObject(), impl(impl), head(NULL), tail(NULL) {}
         virtual ~DefaultConditionObject() {
+            try {
+                unlinkCancelledWaiters();
+            }
+            DECAF_CATCHALL_NOTHROW()
         }
 
         virtual void await() {
@@ -1088,9 +1168,7 @@ namespace locks {
             if (impl->acquireQueued(node, savedState) && interruptMode != THROW_IE)
{
                 interruptMode = REINTERRUPT;
             }
-            if (node->nextWaiter != NULL && interruptMode == 0) {
-                // clean up if canceled but only if we own the lock otherwise another
-                // thread could already be changing the list.
+            if (node->nextWaiter != NULL) {
                 unlinkCancelledWaiters();
             }
             if (interruptMode != 0) {
@@ -1115,7 +1193,7 @@ namespace locks {
             }
         }
 
-        virtual long long awaitNanos( long long nanosTimeout ) {
+        virtual long long awaitNanos(long long nanosTimeout) {
             if (Thread::interrupted()) {
                 throw InterruptedException(__FILE__, __LINE__, "Thread was interrupted");
             }
@@ -1142,9 +1220,7 @@ namespace locks {
             if (impl->acquireQueued(node, savedState) && interruptMode != THROW_IE)
{
                 interruptMode = REINTERRUPT;
             }
-            if (node->nextWaiter != NULL && interruptMode == 0) {
-                // clean up if canceled but only if we own the lock otherwise another
-                // thread could already be changing the list.
+            if (node->nextWaiter != NULL) {
                 unlinkCancelledWaiters();
             }
             if (interruptMode != 0) {
@@ -1154,7 +1230,7 @@ namespace locks {
             return nanosTimeout - (System::nanoTime() - lastTime);
         }
 
-        virtual bool await( long long time, const TimeUnit& unit ) {
+        virtual bool await(long long time, const TimeUnit& unit) {
             long long nanosTimeout = unit.toNanos(time);
             if (Thread::interrupted()) {
                 throw InterruptedException(__FILE__, __LINE__, "Thread was interrupted");
@@ -1184,9 +1260,7 @@ namespace locks {
             if (impl->acquireQueued(node, savedState) && interruptMode != THROW_IE)
{
                 interruptMode = REINTERRUPT;
             }
-            if (node->nextWaiter != NULL && interruptMode == 0) {
-                // clean up if canceled but only if we own the lock otherwise another
-                // thread could already be changing the list.
+            if (node->nextWaiter != NULL) {
                 unlinkCancelledWaiters();
             }
             if (interruptMode != 0) {
@@ -1220,9 +1294,7 @@ namespace locks {
             if (impl->acquireQueued(node, savedState) && interruptMode != THROW_IE)
{
                 interruptMode = REINTERRUPT;
             }
-            if (node->nextWaiter != NULL && interruptMode == 0) {
-                // clean up if canceled but only if we own the lock otherwise another
-                // thread could already be changing the list.
+            if (node->nextWaiter != NULL) {
                 unlinkCancelledWaiters();
             }
             if (interruptMode != 0) {
@@ -1265,17 +1337,12 @@ namespace locks {
                 throw IllegalMonitorStateException();
             }
 
-            PlatformThread::readerLockMutex(this->impl->rwLock);
-
             for (Node* w = head; w != NULL; w = w->nextWaiter) {
                 if (w->waitStatus == Node::CONDITION) {
-                    PlatformThread::unlockRWMutex(this->impl->rwLock);
                     return true;
                 }
             }
 
-            PlatformThread::unlockRWMutex(this->impl->rwLock);
-
             return false;
         }
 
@@ -1285,16 +1352,12 @@ namespace locks {
             }
             int n = 0;
 
-            PlatformThread::readerLockMutex(this->impl->rwLock);
-
             for (Node* w = head; w != NULL; w = w->nextWaiter) {
                 if (w->waitStatus == Node::CONDITION) {
                     ++n;
                 }
             }
 
-            PlatformThread::unlockRWMutex(this->impl->rwLock);
-
             return n;
         }
 
@@ -1304,8 +1367,6 @@ namespace locks {
             }
             ArrayList<Thread*>* list = new ArrayList<Thread*>();
 
-            PlatformThread::readerLockMutex(this->impl->rwLock);
-
             for (Node* w = head; w != NULL; w = w->nextWaiter) {
                 if (w->waitStatus == Node::CONDITION) {
                     Thread* t = (Thread*)w->thread;
@@ -1315,8 +1376,6 @@ namespace locks {
                 }
             }
 
-            PlatformThread::unlockRWMutex(this->impl->rwLock);
-
             return list;
         }
 
@@ -1338,7 +1397,7 @@ namespace locks {
                 unlinkCancelledWaiters();
                 t = tail;
             }
-            Node* node = new Node(Thread::currentThread(), Node::CONDITION);
+            Node* node = impl->nodePool.takeNode(Thread::currentThread(), Node::CONDITION);
             if (t == NULL) {
                 head = node;
             } else {
@@ -1398,9 +1457,6 @@ namespace locks {
          */
         void unlinkCancelledWaiters() {
 
-            // Prevent the parent from deleting nodes while we clean up.
-            PlatformThread::readerLockMutex(this->impl->rwLock);
-
             Node* t = head;
             Node* trail = NULL;
             while (t != NULL) {
@@ -1418,13 +1474,13 @@ namespace locks {
                         tail = trail;
                     }
 
+                    impl->nodePool.returnNode(t);
+
                 } else {
                     trail = t;
                 }
                 t = next;
             }
-
-            PlatformThread::unlockRWMutex(this->impl->rwLock);
         }
 
         /**
@@ -1469,7 +1525,7 @@ AbstractQueuedSynchronizer::AbstractQueu
 
 ////////////////////////////////////////////////////////////////////////////////
 AbstractQueuedSynchronizer::~AbstractQueuedSynchronizer() {
-    try{
+    try {
         delete this->impl;
     }
     DECAF_CATCHALL_NOTHROW()
@@ -1629,17 +1685,12 @@ bool AbstractQueuedSynchronizer::isQueue
         throw NullPointerException(__FILE__, __LINE__, "Passed in thread was NULL");
     }
 
-    PlatformThread::readerLockMutex(this->impl->rwLock);
-
     for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) {
         if (p->thread == thread) {
-            PlatformThread::unlockRWMutex(this->impl->rwLock);
             return true;
         }
     }
 
-    PlatformThread::unlockRWMutex(this->impl->rwLock);
-
     return false;
 }
 
@@ -1647,16 +1698,12 @@ bool AbstractQueuedSynchronizer::isQueue
 int AbstractQueuedSynchronizer::getQueueLength() const {
     int n = 0;
 
-    PlatformThread::readerLockMutex(this->impl->rwLock);
-
     for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) {
         if (p->thread != NULL) {
             ++n;
         }
     }
 
-    PlatformThread::unlockRWMutex(this->impl->rwLock);
-
     return n;
 }
 
@@ -1664,8 +1711,6 @@ int AbstractQueuedSynchronizer::getQueue
 Collection<Thread*>* AbstractQueuedSynchronizer::getQueuedThreads() const {
     ArrayList<Thread*>* list = new ArrayList<Thread*>();
 
-    PlatformThread::readerLockMutex(this->impl->rwLock);
-
     for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) {
         Thread* t = (Thread*)p->thread;
         if (t != NULL) {
@@ -1673,8 +1718,6 @@ Collection<Thread*>* AbstractQueuedSynch
         }
     }
 
-    PlatformThread::unlockRWMutex(this->impl->rwLock);
-
     return list;
 }
 
@@ -1682,8 +1725,6 @@ Collection<Thread*>* AbstractQueuedSynch
 Collection<Thread*>* AbstractQueuedSynchronizer::getExclusiveQueuedThreads() const
{
     ArrayList<Thread*>* list = new ArrayList<Thread*>();
 
-    PlatformThread::readerLockMutex(this->impl->rwLock);
-
     for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) {
         if (!p->isShared()) {
             Thread* t = (Thread*)p->thread;
@@ -1693,8 +1734,6 @@ Collection<Thread*>* AbstractQueuedSynch
         }
     }
 
-    PlatformThread::unlockRWMutex(this->impl->rwLock);
-
     return list;
 }
 
@@ -1702,8 +1741,6 @@ Collection<Thread*>* AbstractQueuedSynch
 Collection<Thread*>* AbstractQueuedSynchronizer::getSharedQueuedThreads() const {
     ArrayList<Thread*>* list = new ArrayList<Thread*>();
 
-    PlatformThread::readerLockMutex(this->impl->rwLock);
-
     for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) {
         if (p->isShared()) {
             Thread* t = (Thread*)p->thread;
@@ -1713,8 +1750,6 @@ Collection<Thread*>* AbstractQueuedSynch
         }
     }
 
-    PlatformThread::unlockRWMutex(this->impl->rwLock);
-
     return list;
 }
 
@@ -1760,8 +1795,6 @@ bool AbstractQueuedSynchronizer::hasQueu
 
     bool result = false;
 
-    PlatformThread::readerLockMutex(this->impl->rwLock);
-
     // The correctness of this depends on head being initialized
     // before tail and on head->next being accurate if the current
     // thread is first in queue.
@@ -1771,7 +1804,5 @@ bool AbstractQueuedSynchronizer::hasQueu
 
     result = h != t && ((s = h->next) == NULL || s->thread != Thread::currentThread());
 
-    PlatformThread::unlockRWMutex(this->impl->rwLock);
-
     return result;
 }



Mime
View raw message