Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 76F0F458A for ; Sat, 21 May 2011 20:47:14 +0000 (UTC) Received: (qmail 89805 invoked by uid 500); 21 May 2011 20:47:14 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 89759 invoked by uid 500); 21 May 2011 20:47:13 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 89752 invoked by uid 99); 21 May 2011 20:47:13 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 21 May 2011 20:47:13 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 21 May 2011 20:47:10 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id A7D01238890A; Sat, 21 May 2011 20:46:50 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1125811 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/decaf/util/concurrent/locks/ test/decaf/util/concurrent/locks/ Date: Sat, 21 May 2011 20:46:50 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110521204650.A7D01238890A@eris.apache.org> Author: tabish Date: Sat May 21 20:46:50 2011 New Revision: 1125811 URL: http://svn.apache.org/viewvc?rev=1125811&view=rev Log: Working AbstractQueuedSynchronizer implementation. Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.h Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp?rev=1125811&r1=1125810&r2=1125811&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp Sat May 21 20:46:50 2011 @@ -25,8 +25,9 @@ #include #include -#include #include +#include +#include using namespace decaf; using namespace decaf::lang; @@ -35,6 +36,7 @@ using namespace decaf::util; using namespace decaf::util::concurrent; using namespace decaf::util::concurrent::atomic; using namespace decaf::util::concurrent::locks; +using namespace decaf::internal::util::concurrent; //////////////////////////////////////////////////////////////////////////////// namespace { @@ -42,6 +44,32 @@ namespace { class Node { public: + /** + * Status field, taking on only the values: + * SIGNAL: The successor of this node is (or will soon be) blocked (via park), + * so the current node must unpark its successor when it releases or + * cancels. To avoid races, acquire methods must first indicate they + * need a signal, then retry the atomic acquire, and then, on failure, + * block. + * CANCELLED: This node is canceled due to timeout or interrupt. Nodes never leave + * this state. In particular, a thread with canceled node never again + * blocks. + * CONDITION: This node is currently on a condition queue. It will not be used as a + * sync queue node until transferred, at which time the status will be + * set to 0. (Use of this value here has nothing to do with the other + * uses of the field, but simplifies mechanics.) + * PROPAGATE: A releaseShared should be propagated to other nodes. This is set + * (for head node only) in doReleaseShared to ensure propagation + * continues, even if other operations have since intervened. + * 0: None of the above + * + * The values are arranged numerically to simplify use. Non-negative values mean that + * a node doesn't need to signal. So, most code doesn't need to check for particular + * values, just for sign. + * + * The field is initialized to 0 for normal sync nodes, and CONDITION for condition nodes. + * It is modified using CAS (or when possible, unconditional volatile writes). + */ enum WaitStatus { CANCELLED = 1, SIGNAL = -1, @@ -54,11 +82,46 @@ namespace { public: - AtomicInteger waitStatus; - AtomicReference prev; - AtomicReference next; + int waitStatus; + + /** + * Link to predecessor node that current node/thread relies on + * for checking waitStatus. Assigned during enqueing, and nulled + * out. Also, upon cancellation of a predecessor, we short-circuit + * while finding a non-canceled one, which will always exist because + * the head node is never canceled: A node becomes head only as a + * result of successful acquire. A canceled thread never succeeds + * in acquiring, and a thread only cancels itself, not any other node. + */ + Node* prev; + + /** + * Link to the successor node that the current node/thread + * unparks upon release. Assigned during enqueuing, adjusted + * when bypassing canceled predecessors, and nulled out when + * dequeued. The enq operation does not assign next field of + * a predecessor until after attachment, so seeing a NULL next + * field does not necessarily mean that node is at end of queue. + * However, if a next field appears to be NULL, we can scan + * prev's from the tail to double-check. + */ + Node* next; + + /** + * The thread that created this Node as is waiting to acquire the + * lock. + */ Thread* thread; - AtomicReference nextWaiter; + + /** + * Link to next node waiting on condition, or the special value SHARED. + * Because condition queues are accessed only when holding in exclusive + * mode, we just need a simple linked queue to hold nodes while they are + * waiting on conditions. They are then transferred to the queue to + * re-acquire. And because conditions can only be exclusive, we save a + * field by using special value to indicate shared mode. + */ + Node* nextWaiter; public: @@ -69,16 +132,14 @@ namespace { Node(Thread* thread, int waitStatus) : waitStatus(waitStatus), prev(NULL), next(NULL), thread(thread), nextWaiter(NULL) { } - ~Node() { - std::cout << "Deleted Node: " << std::hex << this << std::endl; - } + ~Node() {} bool isShared() const { - return this->nextWaiter.get() == &SHARED; + return this->nextWaiter == &SHARED; } Node* predecessor() { - Node* p = prev.get(); + Node* p = prev; if (p == NULL) { throw NullPointerException(); } else { @@ -104,65 +165,123 @@ namespace locks { class SynchronizerState { public: + /** + * The object that owns this one, allows this object to call back into its parent + * as well as checking for exclusive ownership of Conditions. + */ AbstractQueuedSynchronizer* parent; - AtomicInteger state; + + /** + * The Sync state, subclasses can get / set this value to indicate when a Thread + * can acquire the lock or must wait. + */ + volatile int state; + + /** + * Platform level R/W lock. Because we don't implement a garbage collected Node + * scheme we can't just use atomic operations on the Node pointers so in cases where + * we operate on the list of Nodes to remove canceled items we must write lock the + * list. Likewise in cases where we are iterating through the list to collect + * statistics we must ensure that a Node won't suddenly become invalid so we must + * hold a read lock. + */ + decaf_rwmutex_t rwLock; + + /** + * Head of the wait queue, lazily initialized. Except for initialization, + * it is modified only via method setHead. Note: If head exists, its + * waitStatus is guaranteed not to be CANCELLED. + */ AtomicReference head; + + /** + * Tail of the wait queue, lazily initialized. Modified only via method + * enq to add new wait node. + */ AtomicReference tail; public: - SynchronizerState(AbstractQueuedSynchronizer* parent) : parent(parent), state(0), head(), tail() {} + SynchronizerState(AbstractQueuedSynchronizer* parent) : parent(parent), state(0), rwLock(), head(), tail() { + PlatformThread::createRWMutex(&rwLock); + } virtual ~SynchronizerState() { - Node* oldHead = head.getAndSet(NULL); - if (oldHead != tail.get()) { - delete tail.get(); + + while (tail.get() != NULL) { + delete tail.getAndSet(tail.get()->prev); } - delete oldHead; + + PlatformThread::destroyRWMutex(rwLock); } bool isHeldExclusively() const { return this->parent->isHeldExclusively(); } + /** + * Enqueue of a Node is Atomic with respect to the end of the list, so no + * locking needs to occur here. If the head and tail have not been allocated + * we just need to account for contention of two or more enqueues on the + * addition of the new head. + * + * @param node + * The new node to add. + */ Node* enq(Node* node) { for (;;) { Node* t = tail.get(); if (t == NULL) { // Must initialize Node* newHead = new Node(); - std::cout << "Enq first call, allocated head = " << std::hex << newHead << std::endl; - if (head.compareAndSet(NULL, newHead)) { + if (compareAndSetHead(newHead)) { tail.set(head.get()); + } else { + delete newHead; } } else { - node->prev.set(t); - if (tail.compareAndSet(t, node)) { - t->next.set(node); + node->prev = t; + if (compareAndSetTail(t, node)) { + t->next = node; return t; } } } } + /** + * Since we can append itself in one atomic step we don't lock here. If we + * can't get the fast append done we will enter into the longer looping + * enqueue method. + * + * @param node + * The new Node to add. + */ Node* addWaiter(Node* mode) { Node* node = new Node(Thread::currentThread(), mode); - std::cout << "Add Waiter Allocated Node: " << std::hex << node << std::endl; - // Try the fast path of enq; backup to full enq on failure Node* pred = tail.get(); if (pred != NULL) { - node->prev.set(pred); - if (tail.compareAndSet(pred, node)) { - pred->next.set(node); + node->prev = pred; + if (compareAndSetTail(pred, node)) { + pred->next = node; return node; } } + enq(node); return node; } + /** + * The only place head is altered, we NULL out prev since that Node will be + * Destroyed or pooled after this, but leave next alone since it should still + * be valid. + * + * @param node + * The Node that is to become the new Head of the queue. + */ void setHead(Node* node) { head.set(node); node->thread = NULL; - node->prev.set(NULL); + node->prev = NULL; } void unparkSuccessor(Node* node) { @@ -171,29 +290,35 @@ namespace locks { * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ - int ws = node->waitStatus.get(); + int ws = node->waitStatus; if (ws < 0) { - node->waitStatus.compareAndSet(ws, 0); + compareAndSetWaitStatus(node, ws, 0); } + // We need to lock to prevent cancellation of a Node from + // altering the list as we iterate and check Node status fields. + PlatformThread::readerLockMutex(this->rwLock); + /* * Thread to unpark is held in successor, which is normally - * just the next node. But if cancelled or apparently NULL, + * just the next node. But if canceled or apparently NULL, * traverse backwards from tail to find the actual - * non-cancelled successor. + * non-canceled successor. */ - Node* s = node->next.get(); - if (s == NULL || s->waitStatus.get() > 0) { + Node* s = node->next; + if (s == NULL || s->waitStatus > 0) { s = NULL; - for (Node* t = tail.get(); t != NULL && t != node; t = t->prev.get()) - if (t->waitStatus.get() <= 0) { + for (Node* t = tail.get(); t != NULL && t != node; t = t->prev) + if (t->waitStatus <= 0) { s = t; } } if (s != NULL) { - LockSupport::unpark(s->thread); + LockSupport::unpark((Thread*)s->thread); } + + PlatformThread::unlockRWMutex(this->rwLock); } /** @@ -202,6 +327,11 @@ namespace locks { * to calling unparkSuccessor of head if it needs signal.) */ void doReleaseShared() { + + // Here we have to read lock because head could change when a + // different thread does its release shared. + PlatformThread::readerLockMutex(this->rwLock); + /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual @@ -216,13 +346,13 @@ namespace locks { for (;;) { Node* h = head.get(); if (h != NULL && h != tail.get()) { - int ws = h->waitStatus.get(); + int ws = h->waitStatus; if (ws == Node::SIGNAL) { - if (!h->waitStatus.compareAndSet(Node::SIGNAL, 0)) { + if (!compareAndSetWaitStatus(h, Node::SIGNAL, 0)) { continue; // loop to recheck cases } unparkSuccessor(h); - } else if (ws == 0 && !h->waitStatus.compareAndSet(0, Node::PROPAGATE)) { + } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node::PROPAGATE)) { continue; // loop on failed CAS } } @@ -230,6 +360,8 @@ namespace locks { break; } } + + PlatformThread::unlockRWMutex(this->rwLock); } /** @@ -241,29 +373,33 @@ namespace locks { * @param propagate the return value from a tryAcquireShared */ void setHeadAndPropagate(Node* node, int propagate) { + + // Here we have to read lock because head could change when a + // different thread does its release shared. + PlatformThread::readerLockMutex(this->rwLock); + Node* h = head.get(); // Record old head for check below setHead(node); + /* * Try to signal next queued node if: - * Propagation was indicated by caller, - * or was recorded (as h.waitStatus) by a previous operation - * (note: this uses sign-check of waitStatus because - * PROPAGATE status may transition to SIGNAL.) - * and - * The next node is waiting in shared mode, - * or we don't know, because it appears NULL + * Propagation was indicated by caller, or was recorded (as h.waitStatus) + * by a previous operation (note: this uses sign-check of waitStatus because + * PROPAGATE status may transition to SIGNAL.) and the next node is waiting + * in shared mode, or we don't know, because it appears NULL. * - * The conservatism in both of these checks may cause - * unnecessary wake-ups, but only when there are multiple - * racing acquires/releases, so most need signals now or soon - * anyway. + * The conservatism in both of these checks may cause unnecessary wake-ups, + * but only when there are multiple racing acquires/releases, so most need + * signals now or soon anyway. */ - if (propagate > 0 || h == NULL || h->waitStatus.get() < 0) { - Node* s = node->next.get(); + if (propagate > 0 || h == NULL || h->waitStatus < 0) { + Node* s = node->next; if (s == NULL || s->isShared()) { doReleaseShared(); } } + + PlatformThread::unlockRWMutex(this->rwLock); } /** @@ -272,6 +408,7 @@ namespace locks { * @param node the node */ void cancelAcquire(Node* node) { + // Ignore if node doesn't exist if (node == NULL) { return; @@ -279,44 +416,47 @@ namespace locks { node->thread = NULL; - // Skip cancelled predecessors - Node* pred = node->prev.get(); - while (pred->waitStatus.get() > 0) { - pred = pred->prev.get(); - node->prev.set(pred); - } - - // predNext is the apparent node to unsplice. CASes below will - // fail if not, in which case, we lost race vs another cancel - // or signal, so no further action is necessary. - Node* predNext = pred->next.get(); - // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. - node->waitStatus.set(Node::CANCELLED); + node->waitStatus = Node::CANCELLED; // If we are the tail, remove ourselves. - if (node == tail.get() && tail.compareAndSet(node, pred)) { - pred->next.compareAndSet(predNext, NULL); + if (node == tail.get() && compareAndSetTail(node, node->prev)) { + // Attempt to set next on tail, this can fail if another thread can in + // and replaced the old tail but that's ok since that means next is up + // to date in that case. + tail.compareAndSet(node, NULL); delete node; } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; - if (pred != head.get() && - ((ws = pred->waitStatus.get()) == Node::SIGNAL || - (ws <= 0 && pred->waitStatus.compareAndSet(ws, Node::SIGNAL))) && pred->thread != NULL) { - Node* next = node->next.get(); - if (next != NULL && next->waitStatus.get() <= 0) { - pred->next.compareAndSet(predNext, next); - } + + PlatformThread::writerLockMutex(this->rwLock); + + // Did we become the tail. + if (node == tail.get() && compareAndSetTail(node, node->prev)) { + tail.compareAndSet(node, NULL); } else { + node->prev->next = node->next; + node->next->prev = node->prev; + } + + if (node->prev != head.get() && + ((ws = node->prev->waitStatus) == Node::SIGNAL || + (ws <= 0 && compareAndSetWaitStatus(node->prev, ws, Node::SIGNAL))) && + node->prev->thread != NULL) { + + PlatformThread::unlockRWMutex(this->rwLock); + } else { + PlatformThread::unlockRWMutex(this->rwLock); unparkSuccessor(node); } - } - delete node; + delete node; + + } } /** @@ -328,35 +468,38 @@ namespace locks { * @param node the node * @return {@code true} if thread should block */ - static bool shouldParkAfterFailedAcquire(Node* pred, Node* node) { - int ws = pred->waitStatus.get(); + bool shouldParkAfterFailedAcquire(Node* node) { + + bool result = false; + + PlatformThread::readerLockMutex(this->rwLock); + + int ws = node->prev->waitStatus; if (ws == Node::SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ - return true; + result = true; if (ws > 0) { /* - * Predecessor was cancelled. Skip over predecessors and + * Predecessor was canceled. Skip over predecessors and * indicate retry. */ - do { - pred = pred->prev.get(); - node->prev.set(pred); - } while (pred->waitStatus.get() > 0); - pred->next.set(node); + result = false; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ - pred->waitStatus.compareAndSet(ws, Node::SIGNAL); + compareAndSetWaitStatus(node->prev, ws, Node::SIGNAL); } - return false; + PlatformThread::unlockRWMutex(this->rwLock); + + return result; } /** @@ -401,7 +544,7 @@ namespace locks { return interrupted; } - if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { + if (shouldParkAfterFailedAcquire(node) && parkAndCheckInterrupt()) { interrupted = true; } } @@ -413,6 +556,9 @@ namespace locks { throw ex; } + + cancelAcquire(node); + return true; } /** @@ -427,14 +573,12 @@ namespace locks { Node* p = node->predecessor(); if (p == head.get() && parent->tryAcquire(arg)) { setHead(node); - std::cout << "doAcquireInterruptibly waiting thread acquired the sync" << std::endl; delete p; failed = false; return; } - if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { - std::cout << "doAcquireInterruptibly waiting thread interrupted" << std::endl; + if (shouldParkAfterFailedAcquire(node) && parkAndCheckInterrupt()) { throw InterruptedException(); } } @@ -444,7 +588,7 @@ namespace locks { cancelAcquire(node); } - throw ex; + throw InterruptedException(ex); } } @@ -470,10 +614,10 @@ namespace locks { } if (nanosTimeout <= 0) { - return false; + break; } - if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutLimit) { + if (shouldParkAfterFailedAcquire(node) && nanosTimeout > spinForTimeoutLimit) { LockSupport::parkNanos(nanosTimeout); } @@ -493,6 +637,9 @@ namespace locks { throw ex; } + + cancelAcquire(node); + return false; } /** @@ -519,17 +666,19 @@ namespace locks { } } - if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { + if (shouldParkAfterFailedAcquire(node) && parkAndCheckInterrupt()) { interrupted = true; } } - } catch(InterruptedException& ex) { + } catch(Exception& ex) { if (failed) { cancelAcquire(node); } throw ex; } + + cancelAcquire(node); } /** @@ -551,7 +700,7 @@ namespace locks { return; } } - if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { + if (shouldParkAfterFailedAcquire(node) && parkAndCheckInterrupt()) { throw InterruptedException(); } } @@ -589,10 +738,10 @@ namespace locks { } } if (nanosTimeout <= 0) { - return false; + break; } - if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutLimit) { + if (shouldParkAfterFailedAcquire(node) && nanosTimeout > spinForTimeoutLimit) { LockSupport::parkNanos(nanosTimeout); } @@ -611,28 +760,12 @@ namespace locks { throw ex; } + + cancelAcquire(node); + return false; } Thread* fullGetFirstQueuedThread() { - /* - * The first node is normally head->next. Try to get its - * thread field, ensuring consistent reads: If thread - * field is nulled out or s->prev is no longer head, then - * some other thread(s) concurrently performed setHead in - * between some of our reads. We try this twice before - * resorting to traversal. - */ - Node* h = NULL; - Node* s = NULL; - Thread* st = NULL; - - if (((h = head.get()) != NULL && (s = h->next.get()) != NULL && - s->prev.get() == head.get() && (st = s->thread) != NULL) || - ((h = head.get()) != NULL && (s = h->next.get()) != NULL && - s->prev.get() == head.get() && (st = s->thread) != NULL)) { - - return st; - } /* * Head's next field might not have been set yet, or may have @@ -645,10 +778,10 @@ namespace locks { Node* t = tail.get(); Thread* firstThread = NULL; while (t != NULL && t != head.get()) { - Thread* tt = t->thread; + Thread* tt = (Thread*)t->thread; if (tt != NULL) firstThread = tt; - t = t->prev.get(); + t = t->prev; } return firstThread; } @@ -662,11 +795,11 @@ namespace locks { * @return true if is reacquiring */ bool isOnSyncQueue(Node* node) { - if (node->waitStatus.get() == Node::CONDITION || node->prev.get() == NULL) { + if (node->waitStatus == Node::CONDITION || node->prev == NULL) { return false; } - if (node->next.get() != NULL) { // If has successor, it must be on queue + if (node->next != NULL) { // If has successor, it must be on queue return true; } @@ -698,50 +831,64 @@ namespace locks { return false; } - t = t->prev.get(); + t = t->prev; } } /** * Transfers a node from a condition queue onto sync queue. - * Returns true if successful. - * @param node the node + * Returns true if successful. If the node was canceled this + * method will delete it before returning false. + * + * @param node + * The node to transfer to the wait Queue + * * @return true if successfully transferred (else the node was - * cancelled before signal). + * canceled before signal and deleted). */ bool transferForSignal(Node* node) { /* - * If cannot change waitStatus, the node has been cancelled. + * If cannot change waitStatus, the node has been canceled. */ - if (!node->waitStatus.compareAndSet(Node::CONDITION, 0)) { + if (!compareAndSetWaitStatus(node, Node::CONDITION, 0)) { return false; } + // Since we need to write to our predecessor we must lock so that + // it doesn't leave the Queue before we are done. + PlatformThread::writerLockMutex(this->rwLock); + /* * Splice onto queue and try to set waitStatus of predecessor to - * indicate that thread is (probably) waiting. If cancelled or + * indicate that thread is (probably) waiting. If canceled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node* p = enq(node); - int ws = p->waitStatus.get(); - if (ws > 0 || !p->waitStatus.compareAndSet(ws, Node::SIGNAL)) { - LockSupport::unpark(node->thread); + int ws = p->waitStatus; + if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node::SIGNAL)) { + LockSupport::unpark((Thread*)node->thread); } + PlatformThread::unlockRWMutex(this->rwLock); + return true; } /** - * Transfers node, if necessary, to sync queue after a cancelled - * wait. Returns true if thread was cancelled before being - * signalled. - * @param current the waiting thread - * @param node its node - * @return true if cancelled before the node was signalled + * Transfers node, if necessary, to sync queue after a canceled wait. Returns + * true if thread was canceled before being signaled. If the Node is a Condition + * and has been signaled already then its not added as the signal will have + * already done so, we must wait though until it appears on the sync queue + * otherwise the attempt to re-acquire the lock could throw a NullPointerException. + * + * @param node + * The node that is to be transferred onto the sync queue. + * + * @return true if canceled before the node was signaled */ bool transferAfterCancelledWait(Node* node) { - if (node->waitStatus.compareAndSet(Node::CONDITION, 0)) { + if (compareAndSetWaitStatus(node, Node::CONDITION, 0)) { enq(node); return true; } @@ -760,9 +907,13 @@ namespace locks { } /** - * Invokes release with current state value; returns saved state. - * Cancels node and throws exception on failure. - * @param node the condition node for this wait + * Invokes release with current state value; returns saved state. Cancels node + * and throws exception on failure. When a monitor state exception is thrown + * the Node is added to the sync queue so that it can be deleted safely later. + * + * @param node + * The condition node for this wait. + * * @return previous sync state */ int fullyRelease(Node* node) { @@ -777,30 +928,49 @@ namespace locks { } } catch(IllegalMonitorStateException& ex) { if (failed) { - node->waitStatus.set(Node::CANCELLED); + node->waitStatus = Node::CANCELLED; + // Enqueue it even though canceled so that it gets deleted + enq(node); } throw ex; } } + bool compareAndSetHead(Node* update) { + return this->head.compareAndSet(NULL, update); + } + bool compareAndSetTail(Node* expect, Node* update) { + return this->tail.compareAndSet(expect, update); + } + static bool compareAndSetWaitStatus(Node* node, int expect, int update) { + return Atomics::compareAndSet32(&node->waitStatus, expect, update); + } + }; /** * Provides a default implementation that most AbstractQueuedSynchronizer derived classes - * can use without needing to write one from scratch. + * can use without needing to write one from scratch. Since the methods of this Object + * must always be called from a locked synchronizer we can assume that only one thread + * at a time will read or modify the list of waiters so we don't need to lock around + * modifications to the list. + * + * This object creates Node instance but leaves the deletion of those objects up to the + * sync queue. In all cases the Node needs to be transfered to the sync queue with its + * state value set to zero or CANCELLED. */ class DefaultConditionObject : public AbstractQueuedSynchronizer::ConditionObject { private: SynchronizerState* impl; - Node* firstWaiter; - Node* lastWaiter; + Node* head; + Node* tail; - /** Mode meaning to reinterrupt on exit from wait */ - static const int REINTERRUPT; - /** Mode meaning to throw InterruptedException on exit from wait */ - static const int THROW_IE; + enum INTERRUPTION_MODE { + REINTERRUPT = 1, // Re-interrupt thread on exit from a wait call. + THROW_IE = -1 // Throw a new InterruptedException on wait call exit. + }; private: @@ -810,16 +980,21 @@ namespace locks { public: DefaultConditionObject(SynchronizerState* impl) : - ConditionObject(), impl(impl), firstWaiter(NULL), lastWaiter(NULL) {} - virtual ~DefaultConditionObject() {}; + ConditionObject(), impl(impl), head(NULL), tail(NULL) {} + virtual ~DefaultConditionObject() { + } virtual void await() { + if (Thread::interrupted()) { throw InterruptedException(__FILE__, __LINE__, "Thread was interrupted"); } + Node* node = addConditionWaiter(); + int savedState = impl->fullyRelease(node); int interruptMode = 0; + while (!impl->isOnSyncQueue(node)) { LockSupport::park(); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) { @@ -830,7 +1005,7 @@ namespace locks { if (impl->acquireQueued(node, savedState) && interruptMode != THROW_IE) { interruptMode = REINTERRUPT; } - if (node->nextWaiter.get() != NULL) { // clean up if cancelled + if (node->nextWaiter != NULL) { // clean up if canceled unlinkCancelledWaiters(); } if (interruptMode != 0) { @@ -842,6 +1017,7 @@ namespace locks { Node* node = addConditionWaiter(); int savedState = impl->fullyRelease(node); bool interrupted = false; + while (!impl->isOnSyncQueue(node)) { LockSupport::park(); if (Thread::interrupted()) { @@ -862,6 +1038,7 @@ namespace locks { int savedState = impl->fullyRelease(node); long long lastTime = System::nanoTime(); int interruptMode = 0; + while (!impl->isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { impl->transferAfterCancelledWait(node); @@ -880,12 +1057,13 @@ namespace locks { if (impl->acquireQueued(node, savedState) && interruptMode != THROW_IE) { interruptMode = REINTERRUPT; } - if (node->nextWaiter.get() != NULL) { + if (node->nextWaiter != NULL) { unlinkCancelledWaiters(); } if (interruptMode != 0) { reportInterruptAfterWait(interruptMode); } + return nanosTimeout - (System::nanoTime() - lastTime); } @@ -899,6 +1077,7 @@ namespace locks { long long lastTime = System::nanoTime(); bool timedout = false; int interruptMode = 0; + while (!impl->isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = impl->transferAfterCancelledWait(node); @@ -918,7 +1097,7 @@ namespace locks { if (impl->acquireQueued(node, savedState) && interruptMode != THROW_IE) { interruptMode = REINTERRUPT; } - if (node->nextWaiter.get() != NULL) { + if (node->nextWaiter != NULL) { unlinkCancelledWaiters(); } if (interruptMode != 0) { @@ -937,6 +1116,7 @@ namespace locks { int savedState = impl->fullyRelease(node); bool timedout = false; int interruptMode = 0; + while (!impl->isOnSyncQueue(node)) { if (System::currentTimeMillis() > abstime) { timedout = impl->transferAfterCancelledWait(node); @@ -951,7 +1131,7 @@ namespace locks { if (impl->acquireQueued(node, savedState) && interruptMode != THROW_IE) { interruptMode = REINTERRUPT; } - if (node->nextWaiter.get() != NULL) { + if (node->nextWaiter != NULL) { unlinkCancelledWaiters(); } if (interruptMode != 0) { @@ -961,11 +1141,15 @@ namespace locks { return !timedout; } + /** + * The Node that's been waiting the longest is moved from this Conditions + * wait queue to that of the parent Synchronizer. + */ virtual void signal() { if (!impl->isHeldExclusively()) { throw IllegalMonitorStateException(); } - Node* first = firstWaiter; + Node* first = head; if (first != NULL) { doSignal(first); } @@ -975,7 +1159,7 @@ namespace locks { if (!impl->isHeldExclusively()) { throw IllegalMonitorStateException(); } - Node* first = firstWaiter; + Node* first = head; if (first != NULL) { doSignalAll(first); } @@ -989,8 +1173,8 @@ namespace locks { if (!impl->isHeldExclusively()) { throw IllegalMonitorStateException(); } - for (Node* w = firstWaiter; w != NULL; w = w->nextWaiter.get()) { - if (w->waitStatus.get() == Node::CONDITION) { + for (Node* w = head; w != NULL; w = w->nextWaiter) { + if (w->waitStatus == Node::CONDITION) { return true; } } @@ -1002,8 +1186,8 @@ namespace locks { throw IllegalMonitorStateException(); } int n = 0; - for (Node* w = firstWaiter; w != NULL; w = w->nextWaiter.get()) { - if (w->waitStatus.get() == Node::CONDITION) { + for (Node* w = head; w != NULL; w = w->nextWaiter) { + if (w->waitStatus == Node::CONDITION) { ++n; } } @@ -1015,9 +1199,9 @@ namespace locks { throw IllegalMonitorStateException(); } ArrayList* list = new ArrayList(); - for (Node* w = firstWaiter; w != NULL; w = w->nextWaiter.get()) { - if (w->waitStatus.get() == Node::CONDITION) { - Thread* t = w->thread; + for (Node* w = head; w != NULL; w = w->nextWaiter) { + if (w->waitStatus == Node::CONDITION) { + Thread* t = (Thread*)w->thread; if (t != NULL) { list->add(t); } @@ -1028,82 +1212,96 @@ namespace locks { private: + /** + * Adds a new Node to this Condition object to be used by one of the wait methods. + * During this addition we scan for canceled Nodes and remove them as they are + * now contained on the sync queue and will be removed from there. Its safe to do + * this here as this method is called while the sync queue is locked so no other + * changes can occur to the list of Nodes. + * + * @returns the newly added Node instance. + */ Node* addConditionWaiter() { - Node* t = lastWaiter; - // If lastWaiter is cancelled, clean out. - if (t != NULL && t->waitStatus.get() != Node::CONDITION) { + Node* t = tail; + // If last Waiter is canceled, clean out. + if (t != NULL && t->waitStatus != Node::CONDITION) { unlinkCancelledWaiters(); - t = lastWaiter; + t = tail; } Node* node = new Node(Thread::currentThread(), Node::CONDITION); if (t == NULL) { - firstWaiter = node; + head = node; } else { - t->nextWaiter.set(node); + t->nextWaiter = node; } - lastWaiter = node; + tail = node; return node; } /** - * Removes and transfers nodes until hit non-cancelled one or - * NULL. Split out from signal in part to encourage compilers - * to inline the case of no waiters. - * @param first (non-NULL) the first node on condition queue + * Removes and transfers nodes until hit non-canceled one or a NULL one. + * or stated another way traverses the list of waiters removing canceled + * nodes until it finds a non-canceled one to signal. + * + * @param first (non-NULL) + * The first node on condition queue. */ void doSignal(Node* first) { do { - if ((firstWaiter = first->nextWaiter.get()) == NULL) { - lastWaiter = NULL; + head = first->nextWaiter; + first->nextWaiter = NULL; + + if (head == NULL) { + tail = NULL; } - first->nextWaiter.set(NULL); - } while (!impl->transferForSignal(first) && (first = firstWaiter) != NULL); + } while (!impl->transferForSignal(first) && (first = head) != NULL); } /** - * Removes and transfers all nodes. - * @param first (non-NULL) the first node on condition queue + * Removes and transfers all nodes. Removes all Nodes from this Condition object + * starting from the given node pointer, canceled waiter Nodes are moved to so we don't + * need to track them anymore. This is safe to do here since this method must be + * called while the sync queue lock is held. + * + * @param first (non-NULL) + * The first node on condition queue to start transferring from. */ void doSignalAll(Node* first) { - lastWaiter = firstWaiter = NULL; + head = tail = NULL; do { - Node* next = first->nextWaiter.get(); - first->nextWaiter.set(NULL); + Node* next = first->nextWaiter; + first->nextWaiter = NULL; impl->transferForSignal(first); first = next; } while (first != NULL); } /** - * Unlinks cancelled waiter nodes from condition queue. - * Called only while holding lock. This is called when - * cancellation occurred during condition wait, and upon - * insertion of a new waiter when lastWaiter is seen to have - * been cancelled. This method is needed to avoid garbage - * retention in the absence of signals. So even though it may - * require a full traversal, it comes into play only when - * timeouts or cancellations occur in the absence of - * signals. It traverses all nodes rather than stopping at a - * particular target to unlink all pointers to garbage nodes - * without requiring many re-traversals during cancellation - * storms. + * Unlinks canceled waiter nodes from condition queue. Called only while holding + * lock. This is called when cancellation occurred during condition wait, and upon + * insertion of a new waiter when tail is seen to have been canceled. This method + * is needed to avoid garbage retention in the absence of signals. So even though + * it may require a full traversal, it comes into play only when timeouts or + * cancellations occur in the absence of signals. It traverses all nodes rather + * than stopping at a particular target to unlink all pointers to garbage nodes + * without requiring many re-traversals during cancellation storms. */ void unlinkCancelledWaiters() { - Node* t = firstWaiter; + Node* t = head; Node* trail = NULL; while (t != NULL) { - Node* next = t->nextWaiter.get(); - if (t->waitStatus.get() != Node::CONDITION) { - t->nextWaiter.set(NULL); + Node* next = t->nextWaiter; + if (t->waitStatus != Node::CONDITION) { + t->nextWaiter = NULL; if (trail == NULL) { - firstWaiter = next; + head = next; } else { - trail->nextWaiter.set(next); + trail->nextWaiter = next; } if (next == NULL) { - lastWaiter = trail; + tail = trail; } } else { @@ -1114,17 +1312,27 @@ namespace locks { } /** - * Checks for interrupt, returning THROW_IE if interrupted - * before signalled, REINTERRUPT if after signalled, or - * 0 if not interrupted. + * Checks for interrupt, returning THROW_IE if interrupted before signaled, + * REINTERRUPT if after signaled, or 0 if not interrupted. The canceled node + * is transfered to the sync queue so that its waiter can safely re-acquire the + * lock. + * + * @param node + * The node to transfer to the sync queue if interrupted. + * + * @returns value indicating if an action is needed in response to an interrupt. */ int checkInterruptWhileWaiting(Node* node) { return Thread::interrupted() ? (impl->transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } /** - * Throws InterruptedException, reinterrupts current thread, or - * does nothing, depending on mode. + * Throws InterruptedException, re-interrupts current thread, or does nothing, depending + * on mode passed, abstract the logic needed for all the interruptible wait methods out + * into a single place. + * + * @param interruptMode + * indicates what action is needed for interruption handling (if any). */ void reportInterruptAfterWait(int interruptMode) { if (interruptMode == THROW_IE) { @@ -1136,9 +1344,6 @@ namespace locks { }; - const int DefaultConditionObject::REINTERRUPT = 1; - const int DefaultConditionObject::THROW_IE = -1; - }}}} //////////////////////////////////////////////////////////////////////////////// @@ -1156,17 +1361,17 @@ AbstractQueuedSynchronizer::~AbstractQue //////////////////////////////////////////////////////////////////////////////// int AbstractQueuedSynchronizer::getState() const { - return this->impl->state.get(); + return this->impl->state; } //////////////////////////////////////////////////////////////////////////////// void AbstractQueuedSynchronizer::setState(int value) { - this->impl->state.set(value); + this->impl->state = value; } //////////////////////////////////////////////////////////////////////////////// bool AbstractQueuedSynchronizer::compareAndSetState(int expect, int update) { - return this->impl->state.compareAndSet(expect, update); + return Atomics::compareAndSet32(&this->impl->state, expect, update); } //////////////////////////////////////////////////////////////////////////////// @@ -1237,7 +1442,7 @@ bool AbstractQueuedSynchronizer::release if (tryRelease(arg)) { Node* h = this->impl->head.get(); - if (h != NULL && h->waitStatus.get() != 0) { + if (h != NULL && h->waitStatus != 0) { this->impl->unparkSuccessor(h); } @@ -1308,82 +1513,92 @@ bool AbstractQueuedSynchronizer::isQueue throw NullPointerException(__FILE__, __LINE__, "Passed in thread was NULL"); } - for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev.get()) { + PlatformThread::readerLockMutex(this->impl->rwLock); + + for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) { if (p->thread == thread) { + PlatformThread::unlockRWMutex(this->impl->rwLock); return true; } } + PlatformThread::unlockRWMutex(this->impl->rwLock); + return false; } -// bool apparentlyFirstQueuedIsExclusive() { -// Node h, s; -// return (h = head) != NULL && -// (s = h.next) != NULL && -// !s.isShared() && -// s.thread != NULL; -// } -// -// bool hasQueuedPredecessors() { -// // The correctness of this depends on head being initialized -// // before tail and on head.next being accurate if the current -// // thread is first in queue. -// Node t = tail; // Read fields in reverse initialization order -// Node h = head; -// Node s; -// return h != t && -// ((s = h.next) == NULL || s.thread != Thread.currentThread()); -// } - //////////////////////////////////////////////////////////////////////////////// int AbstractQueuedSynchronizer::getQueueLength() const { int n = 0; - for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev.get()) { + + PlatformThread::readerLockMutex(this->impl->rwLock); + + for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) { if (p->thread != NULL) { ++n; } } + + PlatformThread::unlockRWMutex(this->impl->rwLock); + return n; } //////////////////////////////////////////////////////////////////////////////// Collection* AbstractQueuedSynchronizer::getQueuedThreads() const { ArrayList* list = new ArrayList(); - for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev.get()) { - Thread* t = p->thread; + + PlatformThread::readerLockMutex(this->impl->rwLock); + + for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) { + Thread* t = (Thread*)p->thread; if (t != NULL) { list->add(t); } } + + PlatformThread::unlockRWMutex(this->impl->rwLock); + return list; } //////////////////////////////////////////////////////////////////////////////// Collection* AbstractQueuedSynchronizer::getExclusiveQueuedThreads() const { ArrayList* list = new ArrayList(); - for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev.get()) { + + PlatformThread::readerLockMutex(this->impl->rwLock); + + for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) { if (!p->isShared()) { - Thread* t = p->thread; + Thread* t = (Thread*)p->thread; if (t != NULL) { list->add(t); } } } + + PlatformThread::unlockRWMutex(this->impl->rwLock); + return list; } //////////////////////////////////////////////////////////////////////////////// Collection* AbstractQueuedSynchronizer::getSharedQueuedThreads() const { ArrayList* list = new ArrayList(); - for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev.get()) { + + PlatformThread::readerLockMutex(this->impl->rwLock); + + for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) { if (p->isShared()) { - Thread* t = p->thread; + Thread* t = (Thread*)p->thread; if (t != NULL) { list->add(t); } } } + + PlatformThread::unlockRWMutex(this->impl->rwLock); + return list; } Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.cpp?rev=1125811&r1=1125810&r2=1125811&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.cpp Sat May 21 20:46:50 2011 @@ -98,11 +98,8 @@ namespace { virtual void run() { try { - std::cout << "InterruptibleSyncRunnable acquireInterruptibly" << std::endl; mutex->acquireInterruptibly(1); - std::cout << "InterruptibleSyncRunnable was not interrupted" << std::endl; } catch(InterruptedException& success) { - std::cout << "InterruptibleSyncRunnable was interrupted" << std::endl; } catch(Exception& ex) { parent->threadUnexpectedException(ex); } catch(std::exception& stdex) { @@ -125,11 +122,9 @@ namespace { void run() { try { - std::cout << "InterruptedSyncRunnable acquireInterruptibly" << std::endl; mutex->acquireInterruptibly(1); parent->threadFail("Should have been interrupted."); } catch(InterruptedException& success) { - std::cout << "InterruptedSyncRunnable was interrupted" << std::endl; } catch(Exception& ex) { parent->threadUnexpectedException(ex); } catch(std::exception& stdex) { @@ -224,27 +219,26 @@ void AbstractQueuedSynchronizerTest::tes Thread t1(&iSyncRun1); Thread t2(&iSyncRun2); - std::cout << std::endl; try { CPPUNIT_ASSERT(!mutex.isQueued(&t1)); CPPUNIT_ASSERT(!mutex.isQueued(&t2)); mutex.acquire(1); t1.start(); Thread::sleep(SHORT_DELAY_MS); -// CPPUNIT_ASSERT(mutex.isQueued(&t1)); + CPPUNIT_ASSERT(mutex.isQueued(&t1)); t2.start(); Thread::sleep(SHORT_DELAY_MS); -// CPPUNIT_ASSERT(mutex.isQueued(&t1)); -// CPPUNIT_ASSERT(mutex.isQueued(&t2)); + CPPUNIT_ASSERT(mutex.isQueued(&t1)); + CPPUNIT_ASSERT(mutex.isQueued(&t2)); t1.interrupt(); -// Thread::sleep(SHORT_DELAY_MS); -// CPPUNIT_ASSERT(!mutex.isQueued(&t1)); -// CPPUNIT_ASSERT(mutex.isQueued(&t2)); - mutex.release(1); -// Thread::sleep(SHORT_DELAY_MS); -// CPPUNIT_ASSERT(!mutex.isQueued(&t1)); -// Thread::sleep(SHORT_DELAY_MS); -// CPPUNIT_ASSERT(!mutex.isQueued(&t2)); + Thread::sleep(SHORT_DELAY_MS); + CPPUNIT_ASSERT(!mutex.isQueued(&t1)); + CPPUNIT_ASSERT(mutex.isQueued(&t2)); + mutex.release(1); + Thread::sleep(SHORT_DELAY_MS); + CPPUNIT_ASSERT(!mutex.isQueued(&t1)); + Thread::sleep(SHORT_DELAY_MS); + CPPUNIT_ASSERT(!mutex.isQueued(&t2)); t1.join(); t2.join(); } catch(Exception& e){ @@ -705,7 +699,7 @@ void AbstractQueuedSynchronizerTest::tes try { mutex.acquire(1); Date d; - CPPUNIT_ASSERT(!c->awaitUntil((d.getTime() + 10))); + CPPUNIT_ASSERT(!c->awaitUntil((d.getTime() + 15))); mutex.release(1); } catch(Exception& ex) { unexpectedException(); @@ -1521,9 +1515,8 @@ namespace { try { parent->threadAssertFalse(latch->isSignalled()); latch->acquireSharedInterruptibly(0); - parent->threadAssertTrue(latch->isSignalled()); + parent->threadShouldThrow(); } catch(InterruptedException& e) { - parent->threadUnexpectedException(); } } }; @@ -1566,7 +1559,6 @@ namespace { latch->tryAcquireSharedNanos(0, AbstractQueuedSynchronizerTest::SMALL_DELAY_MS* 1000 * 1000); parent->threadShouldThrow(); } catch(InterruptedException& e) { - parent->threadUnexpectedException(); } } }; Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.h?rev=1125811&r1=1125810&r2=1125811&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.h Sat May 21 20:46:50 2011 @@ -30,55 +30,55 @@ namespace locks { class AbstractQueuedSynchronizerTest : public ExecutorsTestSupport { CPPUNIT_TEST_SUITE( AbstractQueuedSynchronizerTest ); -// CPPUNIT_TEST( testIsHeldExclusively ); -// CPPUNIT_TEST( testAcquire ); -// CPPUNIT_TEST( testTryAcquire ); -// CPPUNIT_TEST( testhasQueuedThreads ); -// CPPUNIT_TEST( testIsQueuedNPE ); + CPPUNIT_TEST( testIsHeldExclusively ); + CPPUNIT_TEST( testAcquire ); + CPPUNIT_TEST( testTryAcquire ); + CPPUNIT_TEST( testhasQueuedThreads ); + CPPUNIT_TEST( testIsQueuedNPE ); CPPUNIT_TEST( testIsQueued ); -// CPPUNIT_TEST( testGetFirstQueuedThread ); -// CPPUNIT_TEST( testHasContended ); -// CPPUNIT_TEST( testGetQueuedThreads ); -// CPPUNIT_TEST( testGetExclusiveQueuedThreads ); -// CPPUNIT_TEST( testGetSharedQueuedThreads ); -// CPPUNIT_TEST( testInterruptedException2 ); -// CPPUNIT_TEST( testTryAcquireWhenSynced ); -// CPPUNIT_TEST( testAcquireNanosTimeout ); -// CPPUNIT_TEST( testGetState ); -// CPPUNIT_TEST( testAcquireInterruptibly1 ); -// CPPUNIT_TEST( testAcquireInterruptibly2 ); -// CPPUNIT_TEST( testOwns ); -// CPPUNIT_TEST( testAwaitIllegalMonitor ); -// CPPUNIT_TEST( testSignalIllegalMonitor ); -// CPPUNIT_TEST( testAwaitNanosTimeout ); -// CPPUNIT_TEST( testAwaitTimeout ); -// CPPUNIT_TEST( testAwaitUntilTimeout ); -// CPPUNIT_TEST( testAwait ); -// CPPUNIT_TEST( testHasWaitersNPE ); -// CPPUNIT_TEST( testGetWaitQueueLengthNPE ); -// CPPUNIT_TEST( testGetWaitingThreadsNPE ); -// CPPUNIT_TEST( testHasWaitersIAE ); -// CPPUNIT_TEST( testHasWaitersIMSE ); -// CPPUNIT_TEST( testGetWaitQueueLengthIAE ); -// CPPUNIT_TEST( testGetWaitQueueLengthIMSE ); -// CPPUNIT_TEST( testGetWaitingThreadsIAE ); -// CPPUNIT_TEST( testGetWaitingThreadsIMSE ); -// CPPUNIT_TEST( testHasWaiters ); -// CPPUNIT_TEST( testGetWaitQueueLength ); -// CPPUNIT_TEST( testGetWaitingThreads ); -// CPPUNIT_TEST( testAwaitUninterruptibly ); -// CPPUNIT_TEST( testAwaitInterrupt ); -// CPPUNIT_TEST( testAwaitNanosInterrupt ); -// CPPUNIT_TEST( testAwaitUntilInterrupt ); -// CPPUNIT_TEST( testSignalAll ); -// CPPUNIT_TEST( testToString ); -// CPPUNIT_TEST( testGetStateWithReleaseShared ); -// CPPUNIT_TEST( testReleaseShared ); -// CPPUNIT_TEST( testAcquireSharedInterruptibly ); -// CPPUNIT_TEST( testAsquireSharedTimed ); -// CPPUNIT_TEST( testAcquireSharedInterruptiblyInterruptedException ); -// CPPUNIT_TEST( testAcquireSharedNanosInterruptedException ); -// CPPUNIT_TEST( testAcquireSharedNanosTimeout ); + CPPUNIT_TEST( testGetFirstQueuedThread ); + CPPUNIT_TEST( testHasContended ); + CPPUNIT_TEST( testGetQueuedThreads ); + CPPUNIT_TEST( testGetExclusiveQueuedThreads ); + CPPUNIT_TEST( testGetSharedQueuedThreads ); + CPPUNIT_TEST( testInterruptedException2 ); + CPPUNIT_TEST( testTryAcquireWhenSynced ); + CPPUNIT_TEST( testAcquireNanosTimeout ); + CPPUNIT_TEST( testGetState ); + CPPUNIT_TEST( testAcquireInterruptibly1 ); + CPPUNIT_TEST( testAcquireInterruptibly2 ); + CPPUNIT_TEST( testOwns ); + CPPUNIT_TEST( testAwaitIllegalMonitor ); + CPPUNIT_TEST( testSignalIllegalMonitor ); + CPPUNIT_TEST( testAwaitNanosTimeout ); + CPPUNIT_TEST( testAwaitTimeout ); + CPPUNIT_TEST( testAwaitUntilTimeout ); + CPPUNIT_TEST( testAwait ); + CPPUNIT_TEST( testHasWaitersNPE ); + CPPUNIT_TEST( testGetWaitQueueLengthNPE ); + CPPUNIT_TEST( testGetWaitingThreadsNPE ); + CPPUNIT_TEST( testHasWaitersIAE ); + CPPUNIT_TEST( testHasWaitersIMSE ); + CPPUNIT_TEST( testGetWaitQueueLengthIAE ); + CPPUNIT_TEST( testGetWaitQueueLengthIMSE ); + CPPUNIT_TEST( testGetWaitingThreadsIAE ); + CPPUNIT_TEST( testGetWaitingThreadsIMSE ); + CPPUNIT_TEST( testHasWaiters ); + CPPUNIT_TEST( testGetWaitQueueLength ); + CPPUNIT_TEST( testGetWaitingThreads ); + CPPUNIT_TEST( testAwaitUninterruptibly ); + CPPUNIT_TEST( testAwaitInterrupt ); + CPPUNIT_TEST( testAwaitNanosInterrupt ); + CPPUNIT_TEST( testAwaitUntilInterrupt ); + CPPUNIT_TEST( testSignalAll ); + CPPUNIT_TEST( testToString ); + CPPUNIT_TEST( testGetStateWithReleaseShared ); + CPPUNIT_TEST( testReleaseShared ); + CPPUNIT_TEST( testAcquireSharedInterruptibly ); + CPPUNIT_TEST( testAsquireSharedTimed ); + CPPUNIT_TEST( testAcquireSharedInterruptiblyInterruptedException ); + CPPUNIT_TEST( testAcquireSharedNanosInterruptedException ); + CPPUNIT_TEST( testAcquireSharedNanosTimeout ); CPPUNIT_TEST_SUITE_END(); public: