activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1346165 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks: ReentrantReadWriteLock.cpp ReentrantReadWriteLock.h
Date Mon, 04 Jun 2012 21:26:06 GMT
Author: tabish
Date: Mon Jun  4 21:26:06 2012
New Revision: 1346165

URL: http://svn.apache.org/viewvc?rev=1346165&view=rev
Log:
Initial implementation of ReentrantReadWriteLock

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantReadWriteLock.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantReadWriteLock.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantReadWriteLock.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantReadWriteLock.cpp?rev=1346165&r1=1346164&r2=1346165&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantReadWriteLock.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantReadWriteLock.cpp Mon Jun  4 21:26:06 2012
@@ -18,10 +18,15 @@
 #include "ReentrantReadWriteLock.h"
 
 #include <decaf/lang/Exception.h>
+#include <decaf/lang/Integer.h>
+#include <decaf/lang/Thread.h>
+#include <decaf/lang/ThreadLocal.h>
+#include <decaf/lang/exceptions/IllegalMonitorStateException.h>
 #include <decaf/util/concurrent/locks/AbstractQueuedSynchronizer.h>
 
 using namespace decaf;
 using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
 using namespace decaf::util;
 using namespace decaf::util::concurrent;
 using namespace decaf::util::concurrent::locks;
@@ -29,6 +34,27 @@ using namespace decaf::util::concurrent:
 ////////////////////////////////////////////////////////////////////////////////
 namespace {
 
+    /**
+     * A counter for per-thread read hold counts. Maintained as a ThreadLocal;
+     * cached in cachedHoldCounter in class Sync
+     */
+    struct HoldCounter {
+        int count;
+        Thread* thread;
+
+        HoldCounter() : count(0), thread(Thread::currentThread()) {}
+    };
+
+    class ThreadLocalHoldCounter : public ThreadLocal<HoldCounter> {
+    public:
+
+        virtual ~ThreadLocalHoldCounter() {}
+
+        virtual HoldCounter initialValue() const {
+            return HoldCounter();
+        }
+    };
+
     class Sync : public AbstractQueuedSynchronizer {
     private:
 
@@ -44,7 +70,7 @@ namespace {
         static const int MAX_COUNT;
         static const int EXCLUSIVE_MASK;
 
-    protected:
+    public:
 
         /** Returns the number of shared holds represented in count  */
         static int sharedCount(int c) {
@@ -56,8 +82,368 @@ namespace {
             return c & EXCLUSIVE_MASK;
         }
 
+    private:
+
+        /**
+         * The number of reentrant read locks held by current thread.
+         * Initialized only in constructor and readObject.
+         * Removed whenever a thread's read hold count drops to 0.
+         */
+        ThreadLocalHoldCounter readHolds;
+
+        /**
+         * The hold count of the last thread to successfully acquire
+         * readLock. This saves ThreadLocal lookup in the common case
+         * where the next thread to release is the last one to
+         * acquire.
+         */
+        HoldCounter cachedHoldCounter;
+
+        /**
+         * firstReader is the first thread to have acquired the read lock.
+         * firstReaderHoldCount is firstReader's hold count.
+         *
+         * <p>More precisely, firstReader is the unique thread that last
+         * changed the shared count from 0 to 1, and has not released the
+         * read lock since then; NULL if there is no such thread.
+         *
+         * <p>This allows tracking of read holds for uncontended read
+         * locks to be very cheap.
+         */
+        Thread* firstReader;
+        int firstReaderHoldCount;
+
     public:
 
+        Sync() : readHolds(), cachedHoldCounter(), firstReader(NULL), firstReaderHoldCount(0) {}
+
+        virtual bool isFair() const = 0;
+
+        /*
+         * Note that tryRelease and tryAcquire can be called by
+         * Conditions. So it is possible that their arguments contain
+         * both read and write holds that are all released during a
+         * condition wait and re-established in tryAcquire.
+         */
+
+        virtual bool tryRelease(int releases) {
+
+            if (!this->isHeldExclusively()) {
+                throw IllegalMonitorStateException(__FILE__, __LINE__, "Sync lock not held exclusively");
+            }
+
+            int nextc = getState() - releases;
+            bool free = exclusiveCount(nextc) == 0;
+            if (free) {
+                setExclusiveOwnerThread(NULL);
+            }
+            setState(nextc);
+            return free;
+        }
+
+        bool tryAcquire(int acquires) {
+            /*
+             * Walkthrough:
+             * 1. If read count is nonzero or write count is nonzero
+             *    and owner is a different thread, fail.
+             * 2. If count would saturate, fail. (This can only
+             *    happen if count is already nonzero.)
+             * 3. Otherwise, this thread is eligible for lock if it is
+             *    either a reentrant acquire or queue policy allows it.
+             *    If so, update state and set owner.
+             */
+            Thread* current = Thread::currentThread();
+            int c = getState();
+            int w = exclusiveCount(c);
+
+            if (c != 0) {
+                // (Note: if c != 0 and w == 0 then shared count != 0)
+                if (w == 0 || current != getExclusiveOwnerThread()) {
+                    return false;
+                }
+                if (w + exclusiveCount(acquires) > MAX_COUNT) {
+                    throw new RuntimeException(__FILE__, __LINE__, "Maximum lock count exceeded");
+                }
+                // Reentrant acquire
+                setState(c + acquires);
+                return true;
+            }
+
+            if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) {
+                return false;
+            }
+
+            setExclusiveOwnerThread(current);
+            return true;
+        }
+
+        bool tryReleaseShared(int unused DECAF_UNUSED) {
+            Thread* current = Thread::currentThread();
+            if (firstReader == current) {
+                if (firstReaderHoldCount == 1) {
+                    firstReader = NULL;
+                } else {
+                    firstReaderHoldCount--;
+                }
+            } else {
+                HoldCounter rh = cachedHoldCounter;
+                if (rh.thread == NULL || rh.thread != current) {
+                    rh = readHolds.get();
+                }
+                int count = rh.count;
+                if (count <= 1) {
+                    readHolds.remove();
+                    if (count <= 0) {
+                        throw IllegalMonitorStateException(
+                            __FILE__, __LINE__, "attempt to unlock read lock, not locked by current thread");
+                    }
+                }
+                --rh.count;
+            }
+
+            for (;;) {
+                int c = getState();
+                int nextc = c - SHARED_UNIT;
+                if (compareAndSetState(c, nextc)) {
+                    // Releasing the read lock has no effect on readers, but it may allow
+                    // waiting writers to proceed if both read and write locks are now free.
+                    return nextc == 0;
+                }
+            }
+        }
+
+        virtual int tryAcquireShared(int unused DECAF_UNUSED) {
+            /*
+             * Walk through:
+             * 1. If write lock held by another thread, fail.
+             * 2. Otherwise, this thread is eligible for lock wrt state, so
+             *    ask if it should block because of queue policy. If not, try
+             *    to grant by CASing state and updating count. Note that step
+             *    does not check for reentrant acquires, which is postponed to
+             *    full version to avoid having to check hold count in the more
+             *    typical non-reentrant case.
+             * 3. If step 2 fails either because thread apparently not eligible
+             *    or CAS fails or count saturated, chain to version with full
+             *    retry loop.
+             */
+            Thread* current = Thread::currentThread();
+            int c = getState();
+            if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) {
+                return -1;
+            }
+            int r = sharedCount(c);
+            if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
+                if (r == 0) {
+                    firstReader = current;
+                    firstReaderHoldCount = 1;
+                } else if (firstReader == current) {
+                    firstReaderHoldCount++;
+                } else {
+                    HoldCounter rh = cachedHoldCounter;
+                    if (rh.thread == NULL || rh.thread != current) {
+                        cachedHoldCounter = rh = readHolds.get();
+                    } else if (rh.count == 0) {
+                        readHolds.set(rh);
+                    }
+
+                    rh.count++;
+                }
+                return 1;
+            }
+
+            return fullTryAcquireShared(current);
+        }
+
+        /**
+         * Full version of acquire for reads, that handles CAS misses
+         * and reentrant reads not dealt with in tryAcquireShared.
+         */
+        int fullTryAcquireShared(Thread* current) {
+
+            HoldCounter rh;
+            for (;;) {
+                int c = getState();
+                if (exclusiveCount(c) != 0) {
+                    if (getExclusiveOwnerThread() != current) {
+                        return -1;
+                    }
+                    // else we hold the exclusive lock; blocking here
+                    // would cause deadlock.
+                } else if (readerShouldBlock()) {
+                    // Make sure we're not acquiring read lock reentrantly
+                    if (firstReader == current) {
+                        // assert firstReaderHoldCount > 0;
+                    } else {
+                        if (rh.thread == NULL) {
+                            rh = cachedHoldCounter;
+                            if (rh.thread == NULL || rh.thread != current) {
+                                rh = readHolds.get();
+                                if (rh.count == 0) {
+                                    readHolds.remove();
+                                }
+                            }
+                        }
+
+                        if (rh.count == 0) {
+                            return -1;
+                        }
+                    }
+                }
+                if (sharedCount(c) == MAX_COUNT) {
+                    throw Exception(__FILE__, __LINE__, "Maximum lock count exceeded");
+                }
+                if (compareAndSetState(c, c + SHARED_UNIT)) {
+                    if (sharedCount(c) == 0) {
+                        firstReader = current;
+                        firstReaderHoldCount = 1;
+                    } else if (firstReader == current) {
+                        firstReaderHoldCount++;
+                    } else {
+                        if (rh.thread == NULL) {
+                            rh = cachedHoldCounter;
+                        }
+                        if (rh.thread == NULL || rh.thread != current) {
+                            rh = readHolds.get();
+                        } else if (rh.count == 0) {
+                            readHolds.set(rh);
+                        }
+                        rh.count++;
+                        cachedHoldCounter = rh; // cache for release
+                    }
+                    return 1;
+                }
+            }
+        }
+
+        /**
+         * Performs tryLock for write, enabling barging in both modes.
+         * This is identical in effect to tryAcquire except for lack
+         * of calls to writerShouldBlock.
+         */
+        bool tryWriteLock() {
+            Thread* current = Thread::currentThread();
+            int c = getState();
+            if (c != 0) {
+                int w = exclusiveCount(c);
+                if (w == 0 || current != getExclusiveOwnerThread()) {
+                    return false;
+                }
+                if (w == MAX_COUNT) {
+                    throw new Exception(__FILE__, __LINE__, "Maximum lock count exceeded");
+                }
+            }
+            if (!compareAndSetState(c, c + 1)) {
+                return false;
+            }
+            setExclusiveOwnerThread(current);
+            return true;
+        }
+
+        /**
+         * Performs tryLock for read, enabling barging in both modes.
+         * This is identical in effect to tryAcquireShared except for
+         * lack of calls to readerShouldBlock.
+         */
+        bool tryReadLock() {
+            Thread* current = Thread::currentThread();
+            for (;;) {
+                int c = getState();
+                if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) {
+                    return false;
+                }
+                int r = sharedCount(c);
+                if (r == MAX_COUNT) {
+                    throw Exception(__FILE__, __LINE__, "Maximum lock count exceeded");
+                }
+                if (compareAndSetState(c, c + SHARED_UNIT)) {
+                    if (r == 0) {
+                        firstReader = current;
+                        firstReaderHoldCount = 1;
+                    } else if (firstReader == current) {
+                        firstReaderHoldCount++;
+                    } else {
+                        HoldCounter rh = cachedHoldCounter;
+                        if (rh.thread == NULL || rh.thread != current) {
+                            cachedHoldCounter = rh = readHolds.get();
+                        } else if (rh.count == 0) {
+                            readHolds.set(rh);
+                        }
+                        rh.count++;
+                    }
+                    return true;
+                }
+            }
+        }
+
+        virtual bool isHeldExclusively() const {
+            // While we must in general read state before owner, we don't need to do
+            // so to check if current thread is owner
+            return getExclusiveOwnerThread() == Thread::currentThread();
+        }
+
+        ConditionObject* newCondition() {
+            return AbstractQueuedSynchronizer::createDefaultConditionObject();
+        }
+
+        Thread* getOwner() {
+            // Must read state before owner to ensure memory consistency
+            return ((exclusiveCount(getState()) == 0)? NULL : getExclusiveOwnerThread());
+        }
+
+        int getReadLockCount() {
+            return sharedCount(getState());
+        }
+
+        bool isWriteLocked() {
+            return exclusiveCount(getState()) != 0;
+        }
+
+        int getWriteHoldCount() {
+            return isHeldExclusively() ? exclusiveCount(getState()) : 0;
+        }
+
+        int getReadHoldCount() {
+            if (getReadLockCount() == 0) {
+                return 0;
+            }
+
+            Thread* current = Thread::currentThread();
+            if (firstReader == current) {
+                return firstReaderHoldCount;
+            }
+
+            HoldCounter rh = cachedHoldCounter;
+            if (rh.thread != NULL && rh.thread == current) {
+                return rh.count;
+            }
+
+            int count = readHolds.get().count;
+            if (count == 0) {
+                readHolds.remove();
+            }
+            return count;
+        }
+
+        int getCount() {
+            return getState();
+        }
+
+    protected:
+
+        /**
+         * @returns true if the current thread, when trying to acquire the read lock,
+         *          and otherwise eligible to do so, should block because of policy for
+         *          overtaking other waiting threads.
+         */
+        virtual bool readerShouldBlock() const = 0;
+
+        /**
+         * @returns true if the current thread, when trying to acquire the write lock,
+         *          and otherwise eligible to do so, should block because of policy for
+         *          overtaking other waiting threads.
+         */
+        virtual bool writerShouldBlock() const = 0;
+
     };
 
     const int Sync::SHARED_SHIFT   = 16;
@@ -68,11 +454,38 @@ namespace {
     class FairSync : public Sync {
     public:
 
+        virtual ~FairSync() {}
+
+        virtual bool readerShouldBlock() const {
+            return this->hasQueuedPredecessors();
+        }
+
+        virtual bool writerShouldBlock() const {
+            return this->hasQueuedPredecessors();
+        }
+
+        virtual bool isFair() const {
+            return true;
+        }
     };
 
     class NonFairSync : public Sync {
     public:
 
+        virtual ~NonFairSync() {}
+
+        virtual bool readerShouldBlock() const {
+            return false;
+        }
+
+        virtual bool writerShouldBlock() const {
+            return false;
+            // TODO - add apparentlyFirstQueuedIsExclusive
+        }
+
+        virtual bool isFair() const {
+            return false;
+        }
     };
 
     class ReadLock : public Lock {
@@ -83,6 +496,168 @@ namespace {
         ReadLock(Sync* sync) : Lock(), sync(sync) {
         }
 
+        /**
+         * Acquires the read lock.
+         *
+         * <p>Acquires the read lock if the write lock is not held by
+         * another thread and returns immediately.
+         *
+         * <p>If the write lock is held by another thread then
+         * the current thread becomes disabled for thread scheduling
+         * purposes and lies dormant until the read lock has been acquired.
+         */
+        virtual void lock() {
+            sync->acquireShared(1);
+        }
+
+        /**
+         * Acquires the read lock unless the current thread is
+         *
+         * <p>Acquires the read lock if the write lock is not held
+         * by another thread and returns immediately.
+         *
+         * <p>If the write lock is held by another thread then the
+         * current thread becomes disabled for thread scheduling
+         * purposes and lies dormant until one of two things happens:
+         *
+         * <ul>
+         *   <li>The read lock is acquired by the current thread; or
+         *   <li>Some other thread interrupts the current thread.
+         * </ul>
+         *
+         * <p>If the current thread:
+         *
+         * <ul>
+         *   <li>has its interrupted status set on entry to this method; or
+         *   <li>is interrupted while acquiring the read lock,
+         * </ul>
+         *
+         * then nterruptedException is thrown and the current thread's interrupted
+         * status is cleared.
+         *
+         * <p>In this implementation, as this method is an explicit interruption
+         * point, preference is given to responding to the interrupt over normal
+         * or reentrant acquisition of the lock.
+         *
+         * @throws InterruptedException if the current thread is interrupted
+         */
+        void lockInterruptibly() {
+            sync->acquireSharedInterruptibly(1);
+        }
+
+        /**
+         * Acquires the read lock only if the write lock is not held by
+         * another thread at the time of invocation.
+         *
+         * <p>Acquires the read lock if the write lock is not held by
+         * another thread and returns immediately with the value
+         * {@code true}. Even when this lock has been set to use a
+         * fair ordering policy, a call to {@code tryLock()}
+         * <em>will</em> immediately acquire the read lock if it is
+         * available, whether or not other threads are currently
+         * waiting for the read lock.  This &quot;barging&quot; behavior
+         * can be useful in certain circumstances, even though it
+         * breaks fairness. If you want to honor the fairness setting
+         * for this lock, then use {@link #tryLock(long, TimeUnit)
+         * tryLock(0, TimeUnit.SECONDS) } which is almost equivalent
+         * (it also detects interruption).
+         *
+         * <p>If the write lock is held by another thread then
+         * this method will return immediately with the value
+         * {@code false}.
+         *
+         * @return {@code true} if the read lock was acquired
+         */
+        virtual bool tryLock() {
+            return sync->tryReadLock();
+        }
+
+        /**
+         * Acquires the read lock if the write lock is not held by
+         * another thread within the given waiting time and the
+         * current thread has not been interrupted.
+         *
+         * <p>Acquires the read lock if the write lock is not held by
+         * another thread and returns immediately with the value
+         * {@code true}. If this lock has been set to use a fair
+         * ordering policy then an available lock <em>will not</em> be
+         * acquired if any other threads are waiting for the
+         * lock. This is in contrast to the {@link #tryLock()}
+         * method. If you want a timed {@code tryLock} that does
+         * permit barging on a fair lock then combine the timed and
+         * un-timed forms together:
+         *
+         * <pre>if (lock.tryLock() || lock.tryLock(timeout, unit) ) { ... }
+         * </pre>
+         *
+         * <p>If the write lock is held by another thread then the
+         * current thread becomes disabled for thread scheduling
+         * purposes and lies dormant until one of three things happens:
+         *
+         * <ul>
+         *   <li>The read lock is acquired by the current thread; or
+         *   <li>Some other thread interrupts the current thread; or
+         *   <li>The specified waiting time elapses.
+         * </ul>
+         *
+         * <p>If the read lock is acquired then the value {@code true} is returned.
+         * <p>If the current thread:
+         *
+         * <ul>
+         *   <li>has its interrupted status set on entry to this method; or
+         *   <li>is interrupted while acquiring the read lock,
+         * </ul> then InterruptedException is thrown and the current thread's
+         * interrupted status is cleared.
+         *
+         * <p>If the specified waiting time elapses then the value
+         * {@code false} is returned.  If the time is less than or
+         * equal to zero, the method will not wait at all.
+         *
+         * <p>In this implementation, as this method is an explicit
+         * interruption point, preference is given to responding to
+         * the interrupt over normal or reentrant acquisition of the
+         * lock, and over reporting the elapse of the waiting time.
+         *
+         * @param timeout the time to wait for the read lock
+         * @param unit the time unit of the timeout argument
+         *
+         * @return {@code true} if the read lock was acquired
+         *
+         * @throws InterruptedException if the current thread is interrupted
+         */
+        virtual bool tryLock(long long timeout, const TimeUnit& unit) {
+            return sync->tryAcquireSharedNanos(1, unit.toNanos(timeout));
+        }
+
+        /**
+         * Attempts to release this lock.
+         *
+         * <p> If the number of readers is now zero then the lock
+         * is made available for write lock attempts.
+         */
+        virtual void unlock() {
+            sync->releaseShared(1);
+        }
+
+        /**
+         * Throws UnsupportedOperationException because ReadLocks do not support conditions.
+         * @throws UnsupportedOperationException always
+         */
+        virtual Condition* newCondition() {
+            throw new UnsupportedOperationException();
+        }
+
+        /**
+         * Returns a string identifying this lock, as well as its lock state.
+         * The state, in brackets, includes the String {@code "Read locks ="}
+         * followed by the number of held read locks.
+         *
+         * @return a string identifying this lock, as well as its lock state
+         */
+        virtual std::string toString() const {
+            int r = sync->getReadLockCount();
+            return std::string("[Read locks = ") + Integer::toString(r) + "]";
+        }
     };
 
     class WriteLock : public Lock {
@@ -93,6 +668,249 @@ namespace {
         WriteLock(Sync* sync) : Lock(), sync(sync) {
         }
 
+        /**
+         * Acquires the write lock.
+         *
+         * <p>Acquires the write lock if neither the read nor write lock
+         * are held by another thread
+         * and returns immediately, setting the write lock hold count to
+         * one.
+         *
+         * <p>If the current thread already holds the write lock then the
+         * hold count is incremented by one and the method returns
+         * immediately.
+         *
+         * <p>If the lock is held by another thread then the current
+         * thread becomes disabled for thread scheduling purposes and
+         * lies dormant until the write lock has been acquired, at which
+         * time the write lock hold count is set to one.
+         */
+        virtual void lock() {
+            sync->acquire(1);
+        }
+
+        /**
+         * Acquires the write lock unless the current thread is interrupted
+         *
+         * <p>Acquires the write lock if neither the read nor write lock are held
+         * by another thread and returns immediately, setting the write lock hold
+         * count to one.
+         *
+         * <p>If the current thread already holds this lock then the hold count
+         * is incremented by one and the method returns immediately.
+         *
+         * <p>If the lock is held by another thread then the current thread
+         * becomes disabled for thread scheduling purposes and lies dormant until
+         * one of two things happens:
+         *
+         * <ul>
+         *   <li>The write lock is acquired by the current thread; or
+         *   <li>Some other thread interrupts the current thread.
+         * </ul>
+         *
+         * <p>If the write lock is acquired by the current thread then the
+         * lock hold count is set to one.
+         *
+         * <p>If the current thread:
+         *
+         * <ul>
+         *   <li>has its interrupted status set on entry to this method; or
+         *   <li>is interrupted while acquiring the write lock,
+         * </ul>
+         *
+         * then InterruptedException is thrown and the current thread's
+         * interrupted status is cleared.
+         *
+         * <p>In this implementation, as this method is an explicit interruption
+         * point, preference is given to responding to the interrupt over normal or
+         * reentrant acquisition of the lock.
+         *
+         * @throws InterruptedException if the current thread is interrupted
+         */
+        virtual void lockInterruptibly() {
+            sync->acquireInterruptibly(1);
+        }
+
+        /**
+         * Acquires the write lock only if it is not held by another thread
+         * at the time of invocation.
+         *
+         * <p>Acquires the write lock if neither the read nor write lock are held
+         * by another thread and returns immediately with the value {@code true},
+         * setting the write lock hold count to one. Even when this lock has
+         * been set to use a fair ordering policy, a call to tryLock() immediately
+         * acquire the lock if it is available, whether or not other threads are
+         * currently waiting for the write lock.  This &quot;barging&quot; behavior
+         * can be useful in certain circumstances, even though it breaks fairness.
+         * If you want to honor the fairness setting for this lock, then use
+         * tryLock(0, TimeUnit.SECONDS) which is almost equivalent (it also
+         * detects interruption).
+         *
+         * <p> If the current thread already holds this lock then the hold count is
+         * incremented by one and the method returns true.
+         *
+         * <p>If the lock is held by another thread then this method will return
+         * immediately with the value {@code false}.
+         *
+         * @return if the lock was free and was acquired by the current thread, or
+         * the write lock was already held by the current thread; and false otherwise.
+         */
+        virtual bool tryLock() {
+            return sync->tryWriteLock();
+        }
+
+        /**
+         * Acquires the write lock if it is not held by another thread
+         * within the given waiting time and the current thread has
+         * not been interrupted.
+         *
+         * <p>Acquires the write lock if neither the read nor write lock
+         * are held by another thread
+         * and returns immediately with the value true,
+         * setting the write lock hold count to one. If this lock has been
+         * set to use a fair ordering policy then an available lock
+         * <em>will not</em> be acquired if any other threads are
+         * waiting for the write lock. This is in contrast to the
+         * tryLock() method. If you want a timed tryLock
+         * that does permit barging on a fair lock then combine the
+         * timed and un-timed forms together:
+         *
+         * <pre>if (lock.tryLock() || lock.tryLock(timeout, unit) ) { ... }
+         * </pre>
+         *
+         * <p>If the current thread already holds this lock then the
+         * hold count is incremented by one and the method returns
+         * true.
+         *
+         * <p>If the lock is held by another thread then the current
+         * thread becomes disabled for thread scheduling purposes and
+         * lies dormant until one of three things happens:
+         *
+         * <ul>
+         *   <li>The write lock is acquired by the current thread; or
+         *   <li>Some other thread interrupts the current thread; or
+         *   <li>The specified waiting time elapses
+         * </ul>
+         *
+         * <p>If the write lock is acquired then the value {@code true} is
+         * returned and the write lock hold count is set to one.
+         *
+         * <p>If the current thread:
+         *
+         * <ul>
+         *   <li>has its interrupted status set on entry to this method; or
+         *   <li>is interrupted while acquiring the write lock,
+         * </ul>
+         *
+         * then InterruptedException is thrown and the current
+         * thread's interrupted status is cleared.
+         *
+         * <p>If the specified waiting time elapses then the value
+         * false is returned.  If the time is less than or
+         * equal to zero, the method will not wait at all.
+         *
+         * <p>In this implementation, as this method is an explicit
+         * interruption point, preference is given to responding to
+         * the interrupt over normal or reentrant acquisition of the
+         * lock, and over reporting the elapse of the waiting time.
+         *
+         * @param timeout the time to wait for the write lock
+         * @param unit the time unit of the timeout argument
+         *
+         * @return true if the lock was free and was acquired
+         * by the current thread, or the write lock was already held by the
+         * current thread; and false if the waiting time
+         * elapsed before the lock could be acquired.
+         *
+         * @throws InterruptedException if the current thread is interrupted
+         */
+        virtual bool tryLock(long long timeout, const TimeUnit& unit) {
+            return sync->tryAcquireNanos(1, unit.toNanos(timeout));
+        }
+
+        /**
+         * Attempts to release this lock.
+         *
+         * <p>If the current thread is the holder of this lock then the hold
+         * count is decremented. If the hold count is now zero then the lock
+         * is released.  If the current thread is not the holder of this lock
+         * then IllegalMonitorStateException is thrown.
+         *
+         * @throws IllegalMonitorStateException if the current thread does not
+         *         hold this lock.
+         */
+        virtual void unlock() {
+            sync->release(1);
+        }
+
+        /**
+         * Returns a Condition instance for use with this Lock instance.
+         *
+         * <p>The returned Condition instance supports the same usages as do the
+         * Mutex methods  wait, notify, and notifyAll when used with the built-in
+         * mutex lock.
+         *
+         * <ul>
+         *   <li>If this write lock is not held when any Condition method is called
+         *   then an IllegalMonitorStateException is thrown.  (Read locks are
+         *   held independently of write locks, so are not checked or affected.
+         *   However it is essentially always an error to invoke a condition waiting
+         *   method when the current thread has also acquired read locks, since other
+         *   threads that could unblock it will not be able to acquire the write
+         *   lock.)
+         *   <li>When the condition waiting methods are called the write lock is
+         *   released and, before they return, the write lock is reacquired and the
+         *   lock hold count restored to what it was when the method was called.
+         *   <li>If a thread is interrupted while waiting then the wait will terminate,
+         *   an InterruptedException will be thrown, and the thread's interrupted
+         *   status will be cleared.
+         *   <li> Waiting threads are signalled in FIFO order.
+         *   <li>The ordering of lock reacquisition for threads returning from
+         *   waiting methods is the same as for threads initially acquiring the lock,
+         *   which is in the default case not specified, but for <em>fair</em> locks
+         *   favors those threads that have been waiting the longest.
+         * </ul>
+         *
+         * @return the Condition object
+         */
+        virtual Condition* newCondition() {
+            return sync->newCondition();
+        }
+
+        /**
+         * Returns a string identifying this lock, as well as its lock state.  The
+         * state, in brackets includes either the String "Unlocked" or the String
+         * "Locked by" followed by the name of the owning thread.
+         *
+         * @return a string identifying this lock, as well as its lock state
+         */
+        virtual std::string toString() const {
+            Thread* o = sync->getOwner();
+            return std::string("Lock") + ((o == NULL) ? "[Unlocked]" :
+                                                        "[Locked by thread " + o->getName() + "]");
+        }
+
+        /**
+         * Queries if this write lock is held by the current thread.  Identical in
+         * effect to  isWriteLockedByCurrentThread.
+         *
+         * @return true if the current thread holds this lock and false otherwise
+         */
+        virtual bool isHeldByCurrentThread() const {
+            return sync->isHeldExclusively();
+        }
+
+        /**
+         * Queries the number of holds on this write lock by the current thread.
+         * A thread has a hold on a lock for each lock action that is not matched
+         * by an unlock action.  Identical in effect to getWriteHoldCount.
+         *
+         * @return the number of holds on this lock by the current thread,
+         *         or zero if this lock is not held by the current thread
+         */
+        virtual int getHoldCount() const {
+            return sync->getWriteHoldCount();
+        }
     };
 }
 
@@ -116,8 +934,8 @@ namespace locks {
                 sync = new NonFairSync();
             }
 
-//            readLock = new ReadLock(sync);
-//            writeLock = new WriteLock(sync);
+            readLock = new ReadLock(sync);
+            writeLock = new WriteLock(sync);
         }
 
         ~ReentrantReadWriteLockImpl() {
@@ -130,7 +948,7 @@ namespace locks {
 }}}}
 
 ////////////////////////////////////////////////////////////////////////////////
-ReentrantReadWriteLock::ReentrantReadWriteLock() : ReadWriteLock(), impl(new ReentrantReadWriteLockImpl(true)) {
+ReentrantReadWriteLock::ReentrantReadWriteLock() : ReadWriteLock(), impl(new ReentrantReadWriteLockImpl(false)) {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -155,3 +973,130 @@ Lock& ReentrantReadWriteLock::readLock()
 Lock& ReentrantReadWriteLock::writeLock() {
     throw "";
 }
+
+////////////////////////////////////////////////////////////////////////////////
+bool ReentrantReadWriteLock::isFair() const {
+    return this->impl->sync->isFair();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Thread* ReentrantReadWriteLock::getOwner() const {
+    return this->impl->sync->getOwner();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ReentrantReadWriteLock::getReadLockCount() const {
+    return this->impl->sync->getReadLockCount();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ReentrantReadWriteLock::isWriteLocked() const {
+    return this->impl->sync->isWriteLocked();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ReentrantReadWriteLock::isWriteLockedByCurrentThread() const {
+    return this->impl->sync->isHeldExclusively();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ReentrantReadWriteLock::getWriteHoldCount() const {
+    return this->impl->sync->getWriteHoldCount();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ReentrantReadWriteLock::getReadHoldCount() const {
+    return this->impl->sync->getReadHoldCount();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Collection<Thread*>* ReentrantReadWriteLock::getQueuedWriterThreads() const {
+    return this->impl->sync->getExclusiveQueuedThreads();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Collection<Thread*>* ReentrantReadWriteLock::getQueuedReaderThreads() const {
+    return this->impl->sync->getSharedQueuedThreads();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ReentrantReadWriteLock::hasQueuedThreads() const {
+    return this->impl->sync->hasQueuedThreads();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ReentrantReadWriteLock::hasQueuedThread(Thread* thread) const {
+    return this->impl->sync->isQueued(thread);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ReentrantReadWriteLock::getQueueLength() const {
+    return this->impl->sync->getQueueLength();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Collection<Thread*>* ReentrantReadWriteLock::getQueuedThreads() const {
+    return this->impl->sync->getQueuedThreads();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ReentrantReadWriteLock::hasWaiters(Condition* condition) const {
+
+    if (condition == NULL) {
+        throw NullPointerException(__FILE__, __LINE__, "The Condition to check was NULL");
+    }
+
+    const AbstractQueuedSynchronizer::ConditionObject* cond =
+        dynamic_cast<const AbstractQueuedSynchronizer::ConditionObject*>(condition);
+
+    if (cond == NULL) {
+        throw IllegalArgumentException(__FILE__, __LINE__, "Condition is not associated with this Lock");
+    }
+
+    return this->impl->sync->hasWaiters(cond);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ReentrantReadWriteLock::getWaitQueueLength(Condition* condition) const {
+
+    if (condition == NULL) {
+        throw NullPointerException(__FILE__, __LINE__, "The Condition to check was NULL");
+    }
+
+    const AbstractQueuedSynchronizer::ConditionObject* cond =
+        dynamic_cast<const AbstractQueuedSynchronizer::ConditionObject*>(condition);
+
+    if (cond == NULL) {
+        throw IllegalArgumentException(__FILE__, __LINE__, "Condition is not associated with this Lock");
+    }
+
+    return this->impl->sync->getWaitQueueLength(cond);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Collection<decaf::lang::Thread*>* ReentrantReadWriteLock::getWaitingThreads(Condition* condition) const {
+
+    if (condition == NULL) {
+        throw NullPointerException(__FILE__, __LINE__, "The Condition to check was NULL");
+    }
+
+    const AbstractQueuedSynchronizer::ConditionObject* cond =
+        dynamic_cast<const AbstractQueuedSynchronizer::ConditionObject*>(condition);
+
+    if (cond == NULL) {
+        throw IllegalArgumentException(__FILE__, __LINE__, "Condition is not associated with this Lock");
+    }
+
+    return this->impl->sync->getWaitingThreads(cond);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string ReentrantReadWriteLock::toString() const {
+    int c = this->impl->sync->getCount();
+    int w = this->impl->sync->exclusiveCount(c);
+    int r = this->impl->sync->sharedCount(c);
+
+    return std::string("ReentrantReadWriteLock: ") +
+            "[Write locks = " + Integer::toString(w) +
+            ", Read locks = " + Integer::toString(r) + "]";
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantReadWriteLock.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantReadWriteLock.h?rev=1346165&r1=1346164&r2=1346165&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantReadWriteLock.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantReadWriteLock.h Mon Jun  4 21:26:06 2012
@@ -20,6 +20,8 @@
 
 #include <decaf/util/Config.h>
 
+#include <decaf/lang/Thread.h>
+#include <decaf/util/Collection.h>
 #include <decaf/util/concurrent/locks/ReadWriteLock.h>
 
 namespace decaf {
@@ -67,6 +69,199 @@ namespace locks {
          */
         virtual Lock& writeLock();
 
+        /**
+         * Returns true if this lock has fairness set true.
+         *
+         * @returns true if the Lock uses a fair policy otherwise false.
+         */
+        bool isFair() const;
+
+        /**
+         * Queries the number of read locks held for this lock. This method is designed
+         * for use in monitoring system state, not for synchronization control.
+         *
+         * @return the number of read locks held.
+         */
+        int getReadLockCount() const;
+
+        /**
+         * Queries if the write lock is held by any thread. This method is designed for
+         * use in monitoring system state, not for synchronization control.
+         *
+         * @return true if any thread holds the write lock and false otherwise
+         */
+        bool isWriteLocked() const;
+
+        /**
+         * Queries if the write lock is held by the current thread.
+         *
+         * @return true if the current thread holds the write lock and false otherwise
+         */
+        bool isWriteLockedByCurrentThread() const;
+
+        /**
+         * Queries the number of reentrant write holds on this lock by the current thread.
+         * A writer thread has a hold on a lock for each lock action that is not matched
+         * by an unlock action.
+         *
+         * @return the number of holds on the write lock by the current thread,
+         *         or zero if the write lock is not held by the current thread
+         */
+        int getWriteHoldCount() const;
+
+        /**
+         * Queries the number of reentrant read holds on this lock by the current thread.
+         * A reader thread has a hold on a lock for each lock action that is not matched
+         * by an unlock action.
+         *
+         * @return the number of holds on the read lock by the current thread,
+         *         or zero if the read lock is not held by the current thread
+         */
+        int getReadHoldCount() const;
+
+        /**
+         * Queries whether any threads are waiting on the given condition associated with
+         * the write lock. Note that because timeouts and interrupts may occur at any time,
+         * a true return does not guarantee that a future signal will awaken any threads.
+         * This method is designed primarily for use in monitoring of the system state.
+         *
+         * @param condition
+         *      The condition to be queried for waiters.
+         *
+         * @return true if there are any waiting threads
+         *
+         * @throws NullPointerException if the ConditionObject pointer is NULL.
+         * @throws IllegalArgumentException if the ConditionObject is not associated with this Lock.
+         * @throws IllegalMonitorStateException if the caller does not hold exclusive synchronization.
+         */
+        bool hasWaiters(Condition* condition) const;
+
+        /**
+         * Gets an estimated count of the number of threads that are currently waiting on the given
+         * Condition object, this value changes dynamically so the result of this method can be invalid
+         * immediately after it is called.  The Condition object must be associated with this Lock
+         * or an exception will be thrown.
+         *
+         * @returns an estimate of the number of waiting threads.
+         *
+         * @throws NullPointerException if the ConditionObject pointer is NULL.
+         * @throws IllegalArgumentException if the ConditionObject is not associated with this Synchronizer.
+         * @throws IllegalMonitorStateException if the caller does not hold exclusive synchronization.
+         */
+        int getWaitQueueLength(Condition* condition) const;
+
+        /**
+         * Returns a string identifying this lock, as well as its lock state. The state,
+         * in brackets, includes the String "Write locks =" followed by the number of
+         * reentrantly held write locks, and the String "Read locks =" followed by the
+         * number of held read locks.
+         *
+         * @return a string identifying this lock, as well as its lock state
+         */
+        std::string toString() const;
+
+    protected:
+
+        /**
+         * Creates and returns a new Collection object that contains all the threads that may be waiting
+         * on the given Condition object instance at the time this method is called.
+         *
+         * @returns a Collection pointer that contains waiting threads on given Condition object.
+         *          The caller owns the returned pointer.
+         *
+         * @throws NullPointerException if the ConditionObject pointer is NULL.
+         * @throws IllegalArgumentException if the ConditionObject is not associated with this Synchronizer.
+         * @throws IllegalMonitorStateException if the caller does not hold exclusive synchronization.
+         */
+        decaf::util::Collection<decaf::lang::Thread*>* getWaitingThreads(Condition* condition) const;
+
+        /**
+         * Queries whether any threads are waiting to acquire the read or write lock.
+         * Note that because cancellations may occur at any time, a true return does
+         * not guarantee that any other thread will ever acquire a lock.  This method
+         * is designed primarily for use in monitoring of the system state.
+         *
+         * @return if there may be other threads waiting to acquire the lock
+         */
+        bool hasQueuedThreads() const;
+
+        /**
+         * Queries whether the given thread is waiting to acquire either the read or
+         * write lock. Note that because cancellations may occur at any time, a true
+         * return does not guarantee that this thread will ever acquire a lock.  This
+         * method is designed primarily for use in monitoring of the system state.
+         *
+         * @param thread
+         *      The thread that will be queried for.
+         *
+         * @return true if the given thread is queued waiting for this lock
+         *
+         * @throws NullPointerException if the thread is NULL.
+         */
+        bool hasQueuedThread(decaf::lang::Thread* thread) const;
+
+        /**
+         * Returns an estimate of the number of threads waiting to acquire either the
+         * read or write lock.  The value is only an estimate because the number of
+         * threads may change dynamically while this method traverses internal data
+         * structures.  This method is designed for use in monitoring of the system
+         * state, not for synchronization control.
+         *
+         * @return the estimated number of threads waiting for this lock
+         */
+        int getQueueLength() const;
+
+    protected:
+
+        /**
+         * Returns a collection containing threads that may be waiting to acquire either
+         * the read or write lock.  Because the actual set of threads may change dynamically
+         * while constructing this result, the returned collection is only a best-effort
+         * estimate.  The elements of the returned collection are in no particular order.
+         * This method is designed to facilitate construction of subclasses that provide
+         * more extensive monitoring facilities.
+         *
+         * @return the collection of threads
+         */
+        decaf::util::Collection<decaf::lang::Thread*>* getQueuedThreads() const;
+
+        /**
+         * Returns a collection containing threads that may be waiting to acquire the
+         * write lock.  Because the actual set of threads may change dynamically
+         * while constructing this result, the returned collection is only a best-effort
+         * estimate.  The elements of the returned collection are in no particular order.
+         * This method is designed to facilitate construction of subclasses that provide
+         * more extensive lock monitoring facilities.
+         *
+         * @return the collection of threads
+         */
+        decaf::util::Collection<decaf::lang::Thread*>* getQueuedWriterThreads() const;
+
+        /**
+         * Returns a collection containing threads that may be waiting to acquire the
+         * read lock.  Because the actual set of threads may change dynamically while
+         * constructing this result, the returned collection is only a best-effort estimate.
+         * The elements of the returned collection are in no particular order.  This method
+         * is designed to facilitate construction of subclasses that provide more extensive
+         * lock monitoring facilities.
+         *
+         * @return the collection of threads
+         */
+        decaf::util::Collection<decaf::lang::Thread*>* getQueuedReaderThreads() const;
+
+        /**
+         * Returns the thread that currently owns the write lock, or NULL if not
+         * owned. When this method is called by a thread that is not the owner, the
+         * return value reflects a best-effort approximation of current lock status.
+         * For example, the owner may be momentarily NULL even if there are threads
+         * trying to acquire the lock but have not yet done so.  This method is
+         * designed to facilitate construction of subclasses that provide more
+         * extensive lock monitoring facilities.
+         *
+         * @return the owner thread pointer, or NULL if not currently owned.
+         */
+        decaf::lang::Thread* getOwner() const;
+
     };
 
 }}}}



Mime
View raw message