activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1369852 - /activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp
Date Mon, 06 Aug 2012 14:09:47 GMT
Author: tabish
Date: Mon Aug  6 14:09:47 2012
New Revision: 1369852

URL: http://svn.apache.org/viewvc?rev=1369852&view=rev
Log:
prevent races on node cancellation from overlapping node cleanup. 

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp?rev=1369852&r1=1369851&r2=1369852&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp
Mon Aug  6 14:09:47 2012
@@ -299,32 +299,55 @@ namespace locks {
         }
 
         /**
-         * Enqueue of a Node is Atomic with respect to the end of the list, so no
-         * locking needs to occur here.  If the head and tail have not been allocated
-         * we just need to account for contention of two or more enqueues on the
-         * addition of the new head.
+         * Enqueue of a Node is Atomic with respect to the end of the list, if the list
+         * is empty so no lock needs to occur.  If the head and tail were previously set
+         * then we have to account for contention on the list by two or more enqueues.
          *
          * @param node
          *      The new node to add.
+         *
+         * @returns the predecessor of the newly inserted node.
          */
         Node* enqueue(Node* node) {
-            for (;;) {
-                Node* t = tail.get();
-                if (t == NULL) { // Must initialize
-                    Node* newHead = new Node();
-                    if (compareAndSetHead(newHead)) {
-                        tail.set(head.get());
-                    } else {
-                        delete newHead;
-                    }
-                } else {
-                    node->prev = t;
-                    if (compareAndSetTail(t, node)) {
-                        t->next = node;
-                        return t;
-                    }
-                }
-            }
+
+            Node* pred = NULL;
+
+            PlatformThread::writerLockMutex(rwLock);
+
+            pred = tail.get();
+            if (pred == NULL) { // Must initialize
+                pred = new Node();
+                head.set(pred);
+                tail.set(pred);
+            }
+
+            node->prev = pred;
+            pred->next = node;
+            tail.set(node);
+
+            PlatformThread::unlockRWMutex(rwLock);
+
+            return pred;
+
+//            for (;;) {
+//                Node* t = tail.get();
+//                if (t == NULL) { // Must initialize
+//                    Node* newHead = new Node();
+//                    if (compareAndSetHead(newHead)) {
+//                        tail.set(head.get());
+//                    } else {
+//                        delete newHead;
+//                    }
+//                } else {
+//                    node->prev = t;
+//                    if (compareAndSetTail(t, node)) {
+//                        t->next = node;
+//                        return t;
+//                    }
+//                }
+//            }
+
+            return NULL;
         }
 
         /**
@@ -332,22 +355,28 @@ namespace locks {
          * can't get the fast append done we will enter into the longer looping
          * enqueue method.
          *
-         * @param node
-         *      The new Node to add.
+         * @param mode
+         *      Node::EXCLUSIVE for exclusive, Node::SHARED for shared
+         *
+         * @return the newly added Node
          */
         Node* addWaiter(Node* mode) {
             Node* node = new Node(Thread::currentThread(), mode);
-            Node* pred = tail.get();
-            if (pred != NULL) {
-                node->prev = pred;
-                if (compareAndSetTail(pred, node)) {
-                    pred->next = node;
-                    return node;
-                }
-            }
-
             enqueue(node);
             return node;
+
+//            Node* node = new Node(Thread::currentThread(), mode);
+//            Node* pred = tail.get();
+//            if (pred != NULL) {
+//                node->prev = pred;
+//                if (compareAndSetTail(pred, node)) {
+//                    pred->next = node;
+//                    return node;
+//                }
+//            }
+//
+//            enqueue(node);
+//            return node;
         }
 
         /**
@@ -358,44 +387,55 @@ namespace locks {
          * @param node
          *      The Node that is to become the new Head of the queue.
          */
-        void setHead(Node* node) {
+        Node* setHead(Node* node) {
+            Node* oldHead = NULL;
+            PlatformThread::writerLockMutex(rwLock);
+            oldHead = head.get();
             head.set(node);
             node->thread = NULL;
             node->prev = NULL;
+            PlatformThread::unlockRWMutex(this->rwLock);
+            return oldHead;
+
+//            head.set(node);
+//            node->thread = NULL;
+//            node->prev = NULL;
         }
 
+        /**
+         * Wakes up node's successor, if one exists.
+         *
+         * @param node
+         *      the node whose successor will be unparked.
+         */
         void unparkSuccessor(Node* node) {
-            /*
-             * If status is negative (i.e., possibly needing signal) try
-             * to clear in anticipation of signalling.  It is OK if this
-             * fails or if status is changed by waiting thread.
-             */
+
+            // If status is negative (i.e., possibly needing signal) try to clear
+            // in anticipation of signalling.  It is OK if this fails or if status
+            // is changed by waiting thread.
             int ws = node->waitStatus;
             if (ws < 0) {
                 compareAndSetWaitStatus(node, ws, 0);
             }
 
-            // We need to lock to prevent cancellation of a Node from
-            // altering the list as we iterate and check Node status fields.
+            // We need to lock to prevent cancellation of a Node from altering
+            // the list as we iterate and check Node status fields.
             PlatformThread::readerLockMutex(this->rwLock);
 
-            /*
-             * Thread to unpark is held in successor, which is normally
-             * just the next node.  But if canceled or apparently NULL,
-             * traverse backwards from tail to find the actual
-             * non-canceled successor.
-             */
-            Node* s = node->next;
-            if (s == NULL || s->waitStatus > 0) {
-                s = NULL;
+            // Thread to un-park is held in successor, which is normally just the
+            // next node.  But if canceled or apparently NULL, traverse backwards
+            // from tail to find the actual non-canceled successor.
+            Node* successor = node->next;
+            if (successor == NULL || successor->waitStatus > 0) {
+                successor = NULL;
                 for (Node* t = tail.get(); t != NULL && t != node; t = t->prev)
                     if (t->waitStatus <= 0) {
-                        s = t;
+                        successor = t;
                     }
             }
 
-            if (s != NULL) {
-                LockSupport::unpark((Thread*)s->thread);
+            if (successor != NULL) {
+                LockSupport::unpark((Thread*)successor->thread);
             }
 
             PlatformThread::unlockRWMutex(this->rwLock);
@@ -412,17 +452,13 @@ namespace locks {
             // different thread does its release shared.
             PlatformThread::readerLockMutex(this->rwLock);
 
-            /*
-             * Ensure that a release propagates, even if there are other
-             * in-progress acquires/releases.  This proceeds in the usual
-             * way of trying to unparkSuccessor of head if it needs
-             * signal. But if it does not, status is set to PROPAGATE to
-             * ensure that upon release, propagation continues.
-             * Additionally, we must loop in case a new node is added
-             * while we are doing this. Also, unlike other uses of
-             * unparkSuccessor, we need to know if CAS to reset status
-             * fails, if so rechecking.
-             */
+            // Ensure that a release propagates, even if there are other in-progress
+            // acquires/releases.  This proceeds in the usual way of trying to
+            // unparkSuccessor of head if it needs signal. But if it does not,
+            // status is set to PROPAGATE to ensure that upon release, propagation
+            // continues. Additionally, we must loop in case a new node is added
+            // while we are doing this. Also, unlike other uses of unparkSuccessor,
+            // we need to know if CAS to reset status fails, if so rechecking.
             for (;;) {
                 Node* h = head.get();
                 if (h != NULL && h != tail.get()) {
@@ -445,47 +481,51 @@ namespace locks {
         }
 
         /**
-         * Sets head of queue, and checks if successor may be waiting
-         * in shared mode, if so propagating if either propagate > 0 or
-         * PROPAGATE status was set.
+         * Sets head of queue, and checks if successor may be waiting in shared mode,
+         * if so propagating if either propagate > 0 or PROPAGATE status was set.
          *
-         * @param node the node
-         * @param propagate the return value from a tryAcquireShared
+         * @param node
+         *      The node that will become head
+         * @param propagate
+         *      The return value from a tryAcquireShared
+         *
+         * @return the Node that was the head.
          */
-        void setHeadAndPropagate(Node* node, int propagate) {
+        Node* setHeadAndPropagate(Node* node, int propagate) {
+
+            Node* head = setHead(node); // Record old head for check below
 
             // Here we have to read lock because head could change when a
             // different thread does its release shared.
             PlatformThread::readerLockMutex(this->rwLock);
 
-            Node* h = head.get(); // Record old head for check below
-            setHead(node);
-
-            /*
-             * Try to signal next queued node if:
-             *   Propagation was indicated by caller, or was recorded (as h.waitStatus)
-             *   by a previous operation (note: this uses sign-check of waitStatus because
-             *   PROPAGATE status may transition to SIGNAL.) and the next node is waiting
-             *   in shared mode, or we don't know, because it appears NULL.
-             *
-             * The conservatism in both of these checks may cause unnecessary wake-ups,
-             * but only when there are multiple racing acquires/releases, so most need
-             * signals now or soon anyway.
-             */
-            if (propagate > 0 || h == NULL || h->waitStatus < 0) {
-                Node* s = node->next;
-                if (s == NULL || s->isShared()) {
+            // Try to signal next queued node if:
+            //   Propagation was indicated by caller, or was recorded (as head->waitStatus)
+            //   by a previous operation (note: this uses sign-check of waitStatus because
+            //   PROPAGATE status may transition to SIGNAL.) and the next node is waiting
+            //   in shared mode, or we don't know, because it appears NULL.
+            //
+            // The conservatism in both of these checks may cause unnecessary wake-ups,
+            // but only when there are multiple racing acquires/releases, so most need
+            // signals now or soon anyway.
+            if (propagate > 0 || head == NULL || head->waitStatus < 0) {
+                Node* successor = node->next;
+                if (successor == NULL || successor->isShared()) {
                     doReleaseShared();
                 }
             }
 
             PlatformThread::unlockRWMutex(this->rwLock);
+
+            return head;
         }
 
         /**
-         * Cancels an ongoing attempt to acquire.
+         * Cancels an ongoing attempt to acquire.  The cancelled Node needs to be
+         * deleted but we need to unlink it from the list before we can do so.
          *
-         * @param node the node
+         * @param node
+         *      The node that was attempting to acquire, will be delted here.
          */
         void cancelAcquire(Node* node) {
 
@@ -496,56 +536,93 @@ namespace locks {
 
             node->thread = NULL;
 
-            // Can use unconditional write instead of CAS here.
-            // After this atomic step, other Nodes can skip past us.
-            // Before, we are free of interference from other threads.
+            // Can use unconditional write instead of CAS here.  After this atomic
+            // step, other Nodes can skip past us. Before, we are free of interference
+            // from other threads.
             node->waitStatus = Node::CANCELLED;
 
-            // If we are the tail, remove ourselves.
-            if (node == tail.get() && compareAndSetTail(node, node->prev)) {
-                // Attempt to set next on tail, this can fail if another thread can in
-                // and replaced the old tail but that's ok since that means next is up
-                // to date in that case.
-                Atomics::compareAndSwap<Node>(tail.get()->next, node, NULL);
-                delete node;
-            } else {
-                // If successor needs signal, try to set pred's next-link
-                // so it will get one. Otherwise wake it up to propagate.
-                int ws;
-
-                PlatformThread::writerLockMutex(this->rwLock);
-
-                // Did we become the tail.
-                if (node == tail.get() && compareAndSetTail(node, node->prev))
{
-                    Atomics::compareAndSwap<Node>(tail.get()->next, node, NULL);
-                } else {
-                    node->prev->next = node->next;
-                    node->next->prev = node->prev;
-                }
+            // If successor needs signal, try to set pred's next-link so it will
+            // get one. Otherwise wake it up to propagate.
+            int ws = 0;
 
-                if (node->prev != head.get() &&
-                    ((ws = node->prev->waitStatus) == Node::SIGNAL ||
-                     (ws <= 0 && compareAndSetWaitStatus(node->prev, ws, Node::SIGNAL)))
&&
-                     node->prev->thread != NULL) {
+            PlatformThread::writerLockMutex(this->rwLock);
 
-                    PlatformThread::unlockRWMutex(this->rwLock);
-                } else {
-                    PlatformThread::unlockRWMutex(this->rwLock);
-                    unparkSuccessor(node);
-                }
+            if (node == tail.get()) {
+                tail.set(node->prev);
+                node->prev->next = NULL;
+            } else {
+                node->prev->next = node->next;
+                node->next->prev = node->prev;
+            }
+
+            if (node->prev != head.get() &&
+                ((ws = node->prev->waitStatus) == Node::SIGNAL ||
+                 (ws <= 0 && compareAndSetWaitStatus(node->prev, ws, Node::SIGNAL)))
&&
+                 node->prev->thread != NULL) {
 
-                delete node;
+                PlatformThread::unlockRWMutex(this->rwLock);
+            } else {
+                PlatformThread::unlockRWMutex(this->rwLock);
+                unparkSuccessor(node);
             }
+
+            delete node;
+
+//            node->thread = NULL;
+//
+//            // Can use unconditional write instead of CAS here.  After this atomic
+//            // step, other Nodes can skip past us. Before, we are free of interference
+//            // from other threads.
+//            node->waitStatus = Node::CANCELLED;
+//
+//            // If we are the tail, remove ourselves.
+//            if (node == tail.get() && compareAndSetTail(node, node->prev)) {
+//                // Attempt to set next on tail, this can fail if another thread can in
+//                // and replaced the old tail but that's ok since that means next is up
+//                // to date in that case.
+//                Atomics::compareAndSwap<Node>(tail.get()->next, node, NULL);
+//                delete node;
+//            } else {
+//                // If successor needs signal, try to set pred's next-link
+//                // so it will get one. Otherwise wake it up to propagate.
+//                int ws;
+//
+//                PlatformThread::writerLockMutex(this->rwLock);
+//
+//                // Did we become the tail.
+//                if (node == tail.get() && compareAndSetTail(node, node->prev))
{
+//                    Atomics::compareAndSwap<Node>(tail.get()->next, node, NULL);
+//                } else {
+//                    node->prev->next = node->next;
+//                    node->next->prev = node->prev;
+//                }
+//
+//                if (node->prev != head.get() &&
+//                    ((ws = node->prev->waitStatus) == Node::SIGNAL ||
+//                     (ws <= 0 && compareAndSetWaitStatus(node->prev, ws,
Node::SIGNAL))) &&
+//                     node->prev->thread != NULL) {
+//
+//                    PlatformThread::unlockRWMutex(this->rwLock);
+//                } else {
+//                    PlatformThread::unlockRWMutex(this->rwLock);
+//                    unparkSuccessor(node);
+//                }
+//
+//                delete node;
+//            }
         }
 
         /**
-         * Checks and updates status for a node that failed to acquire.
-         * Returns true if thread should block. This is the main signal
-         * control in all acquire loops.  Requires that pred == node.prev
+         * Checks and updates status for a node that failed to acquire. Returns true
+         * if thread should block. This is the main signal control in all acquire loops.
+         * Requires that pred == node->prev
          *
-         * @param pred node's predecessor holding status
-         * @param node the node
-         * @return {@code true} if thread should block
+         * @param pred
+         *      The ode's predecessor holding status
+         * @param node
+         *      The node whose acquire attempt failed.
+         *
+         * @return true if thread should block.
          */
         bool shouldParkAfterFailedAcquire(Node* node) {
 
@@ -556,23 +633,16 @@ namespace locks {
             int ws = node->prev->waitStatus;
 
             if (ws == Node::SIGNAL)
-                /*
-                 * This node has already set status asking a release
-                 * to signal it, so it can safely park.
-                 */
+                // This node has already set status asking a release to signal
+                // it, so it can safely park.
                 result = true;
             if (ws > 0) {
-                /*
-                 * Predecessor was canceled. Skip over predecessors and
-                 * indicate retry.
-                 */
+                 // Predecessor was canceled. Skip over predecessors and indicate retry.
                 result = false;
             } else {
-                /*
-                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
-                 * need a signal, but don't park yet.  Caller will need to
-                 * retry to make sure it cannot acquire before parking.
-                 */
+                // waitStatus must be 0 or PROPAGATE.  Indicate that we need a
+                // signal, but don't park yet.  Caller will need to retry to
+                // make sure it cannot acquire before parking.
                 compareAndSetWaitStatus(node->prev, ws, Node::SIGNAL);
             }
 
@@ -591,7 +661,7 @@ namespace locks {
         /**
          * Convenience method to park and then check if interrupted
          *
-         * @return {@code true} if interrupted
+         * @return true if interrupted.
          */
         bool parkAndCheckInterrupt() const {
             LockSupport::park();
@@ -599,15 +669,17 @@ namespace locks {
         }
 
         /**
-         * Acquires in exclusive uninterruptible mode for thread already in
-         * queue. Used by condition wait methods as well as acquire.
+         * Acquires in exclusive uninterruptible mode for thread already in queue
+         * Used by condition wait methods as well as acquire.  We can access the
+         * Node's data here as it can't be deleted on us because it is guaranteed
+         * to exist until this method returns.
          *
          * @param node
          *      The node.
          * @param arg
          *      The value passed to acquire.
          *
-         * @return {@code true} if interrupted while waiting
+         * @return true if interrupted while waiting
          */
         bool acquireQueued(Node* node, int arg) {
             bool failed = true;
@@ -617,8 +689,7 @@ namespace locks {
                 for (;;) {
                     Node* p = node->predecessor();
                     if (p == head.get() && parent->tryAcquire(arg)) {
-                        setHead(node);
-                        delete p;
+                        delete setHead(node);
                         failed = false;
                         return interrupted;
                     }
@@ -651,8 +722,7 @@ namespace locks {
                 for (;;) {
                     Node* p = node->predecessor();
                     if (p == head.get() && parent->tryAcquire(arg)) {
-                        setHead(node);
-                        delete p;
+                        delete setHead(node);
                         failed = false;
                         return;
                     }
@@ -686,8 +756,7 @@ namespace locks {
                 for (;;) {
                     Node* p = node->predecessor();
                     if (p == head.get() && parent->tryAcquire(arg)) {
-                        setHead(node);
-                        delete p;
+                        delete setHead(node);
                         failed = false;
                         return true;
                     }
@@ -735,8 +804,7 @@ namespace locks {
                     if (p == head.get()) {
                         int r = parent->tryAcquireShared(arg);
                         if (r >= 0) {
-                            setHeadAndPropagate(node, r);
-                            delete p;
+                            delete setHeadAndPropagate(node, r);
                             if (interrupted) {
                                 selfInterrupt();
                             }
@@ -773,8 +841,7 @@ namespace locks {
                     if (p == head.get()) {
                         int r = parent->tryAcquireShared(arg);
                         if (r >= 0) {
-                            setHeadAndPropagate(node, r);
-                            delete p;
+                            delete setHeadAndPropagate(node, r);
                             failed = false;
                             return;
                         }
@@ -795,9 +862,12 @@ namespace locks {
         /**
          * Acquires in shared timed mode.
          *
-         * @param arg the acquire argument
-         * @param nanosTimeout max wait time
-         * @return {@code true} if acquired
+         * @param arg
+         *      The acquire argument (implementation specific).
+         * @param nanosTimeout
+         *      Max wait time for the Aquire in nanos
+         *
+         * @return true if acquired
          */
         bool doAcquireSharedNanos(int arg, long long nanosTimeout) {
 
@@ -810,8 +880,7 @@ namespace locks {
                     if (p == head.get()) {
                         int r = parent->tryAcquireShared(arg);
                         if (r >= 0) {
-                            setHeadAndPropagate(node, r);
-                            delete p;
+                            delete setHeadAndPropagate(node, r);
                             failed = false;
                             return true;
                         }
@@ -846,22 +915,25 @@ namespace locks {
 
         Thread* fullGetFirstQueuedThread() {
 
-            /*
-             * Head's next field might not have been set yet, or may have
-             * been unset after setHead. So we must check to see if tail
-             * is actually first node. If not, we continue on, safely
-             * traversing from tail back to head to find first,
-             * guaranteeing termination.
-             */
+            // Head's next field might not have been set yet, or may have been
+            // unset after setHead. So we must check to see if tail is actually
+            // first node. If not, we continue on, safely traversing from tail
+            // back to head to find first, guaranteeing termination.
+
+            PlatformThread::readerLockMutex(this->rwLock);
 
             Node* t = tail.get();
             Thread* firstThread = NULL;
             while (t != NULL && t != head.get()) {
                 Thread* tt = (Thread*)t->thread;
-                if (tt != NULL)
+                if (tt != NULL) {
                     firstThread = tt;
+                }
                 t = t->prev;
             }
+
+            PlatformThread::unlockRWMutex(this->rwLock);
+
             return firstThread;
         }
 
@@ -882,44 +954,46 @@ namespace locks {
                 return true;
             }
 
-            /*
-             * node->prev can be non-NULL, but not yet on queue because
-             * the CAS to place it on queue can fail. So we have to
-             * traverse from tail to make sure it actually made it.  It
-             * will always be near the tail in calls to this method, and
-             * unless the CAS failed (which is unlikely), it will be
-             * there, so we hardly ever traverse much.
-             */
+            // node->prev can be non-NULL, but not yet on queue because the CAS
+            // to place it on queue can fail. So we have to traverse from tail to
+            // make sure it actually made it.  It will always be near the tail in
+            // calls to this method, and unless the CAS failed (which is unlikely),
+            // it will be there, so we hardly ever traverse much.
             return findNodeFromTail(node);
         }
 
         /**
          * Returns true if node is on sync queue by searching backwards from tail.
          * Called only when needed by isOnSyncQueue.
+         *
          * @return true if present
          */
         bool findNodeFromTail(Node* node) {
+            bool found = false;
+
+            PlatformThread::readerLockMutex(this->rwLock);
             Node* t = tail.get();
             for (;;) {
 
                 if (t == node) {
-                    return true;
+                    found = true;
+                    break;
                 }
 
                 if (t == NULL) {
-                    return false;
+                    break;
                 }
 
                 t = t->prev;
             }
 
-            return false;
+            PlatformThread::unlockRWMutex(this->rwLock);
+            return found;
         }
 
         /**
          * Transfers a node from a condition queue onto sync queue.
-         * Returns true if successful.  If the node was canceled this
-         * method will delete it before returning false.
+         * Returns true if successful.
          *
          * @param node
          *      The node to transfer to the wait Queue
@@ -928,24 +1002,26 @@ namespace locks {
          *         canceled before signal and deleted).
          */
         bool transferForSignal(Node* node) {
-            /*
-             * If cannot change waitStatus, the node has been canceled.
-             */
+
+            // If we cannot change waitStatus, the node has been canceled.
             if (!compareAndSetWaitStatus(node, Node::CONDITION, 0)) {
                 return false;
             }
 
+            // Get the Node onto the list, once there we need to update its
+            // predecessor to indicate it should signal this Node once it is
+            // woken up either by cancel or by acquiring the lock.
+            enqueue(node);
+
             // Since we need to write to our predecessor we must lock so that
             // it doesn't leave the Queue before we are done.
-            PlatformThread::writerLockMutex(this->rwLock);
+            PlatformThread::readerLockMutex(this->rwLock);
 
-            /*
-             * Splice onto queue and try to set waitStatus of predecessor to
-             * indicate that thread is (probably) waiting. If canceled or
-             * attempt to set waitStatus fails, wake up to resync (in which
-             * case the waitStatus can be transiently and harmlessly wrong).
-             */
-            Node* p = enqueue(node);
+            // Splice onto queue and try to set waitStatus of predecessor to
+            // indicate that thread is (probably) waiting. If canceled or
+            // attempt to set waitStatus fails, wake up to resync (in which
+            // case the waitStatus can be transiently and harmlessly wrong).
+            Node* p = node->prev;
             int ws = p->waitStatus;
             if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node::SIGNAL)) {
                 LockSupport::unpark((Thread*)node->thread);
@@ -1027,7 +1103,6 @@ namespace locks {
         static bool compareAndSetWaitStatus(Node* node, int expect, int update) {
             return Atomics::compareAndSet32(&node->waitStatus, expect, update);
         }
-
     };
 
     /**
@@ -1262,11 +1337,18 @@ namespace locks {
             if (!impl->isHeldExclusively()) {
                 throw IllegalMonitorStateException();
             }
+
+            PlatformThread::readerLockMutex(this->impl->rwLock);
+
             for (Node* w = head; w != NULL; w = w->nextWaiter) {
                 if (w->waitStatus == Node::CONDITION) {
+                    PlatformThread::unlockRWMutex(this->impl->rwLock);
                     return true;
                 }
             }
+
+            PlatformThread::unlockRWMutex(this->impl->rwLock);
+
             return false;
         }
 
@@ -1275,11 +1357,17 @@ namespace locks {
                 throw IllegalMonitorStateException();
             }
             int n = 0;
+
+            PlatformThread::readerLockMutex(this->impl->rwLock);
+
             for (Node* w = head; w != NULL; w = w->nextWaiter) {
                 if (w->waitStatus == Node::CONDITION) {
                     ++n;
                 }
             }
+
+            PlatformThread::unlockRWMutex(this->impl->rwLock);
+
             return n;
         }
 
@@ -1288,6 +1376,9 @@ namespace locks {
                 throw IllegalMonitorStateException();
             }
             ArrayList<Thread*>* list = new ArrayList<Thread*>();
+
+            PlatformThread::readerLockMutex(this->impl->rwLock);
+
             for (Node* w = head; w != NULL; w = w->nextWaiter) {
                 if (w->waitStatus == Node::CONDITION) {
                     Thread* t = (Thread*)w->thread;
@@ -1296,6 +1387,9 @@ namespace locks {
                     }
                 }
             }
+
+            PlatformThread::unlockRWMutex(this->impl->rwLock);
+
             return list;
         }
 
@@ -1376,6 +1470,10 @@ namespace locks {
          * without requiring many re-traversals during cancellation storms.
          */
         void unlinkCancelledWaiters() {
+
+            // Prevent the parent from deleting nodes while we clean up.
+            PlatformThread::readerLockMutex(this->impl->rwLock);
+
             Node* t = head;
             Node* trail = NULL;
             while (t != NULL) {
@@ -1398,6 +1496,8 @@ namespace locks {
                 }
                 t = next;
             }
+
+            PlatformThread::unlockRWMutex(this->impl->rwLock);
         }
 
         /**



Mime
View raw message