activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1369198 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/decaf/util/concurrent/ThreadPoolExecutor.cpp test/decaf/lang/PointerTest.cpp
Date Fri, 03 Aug 2012 20:33:18 GMT
Author: tabish
Date: Fri Aug  3 20:33:18 2012
New Revision: 1369198

URL: http://svn.apache.org/viewvc?rev=1369198&view=rev
Log:
Fix a race in the destructor of ThreadPoolExecutor due to the count of active Worker threads
being decremented to early, lead to segfaults when the destructor was allowed to finished
while the workers were still shutting down. 

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/lang/PointerTest.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp?rev=1369198&r1=1369197&r2=1369198&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
Fri Aug  3 20:33:18 2012
@@ -499,18 +499,20 @@ namespace concurrent{
          * termination possible -- reducing worker count or removing tasks
          * from the queue during shutdown. The method is non-private to
          * allow access from ScheduledThreadPoolExecutor.
+         *
+         * @returns true if the termination succeeded.
          */
-        void tryTerminate() {
+        bool tryTerminate() {
             for (;;) {
                 int c = ctl.get();
                 if (isRunning(c) || runStateAtLeast(c, TIDYING) ||
                     (runStateOf(c) == SHUTDOWN && !workQueue->isEmpty())) {
-                    return;
+                    return false;
                 }
 
                 if (workerCountOf(c) != 0) { // Eligible to terminate
                     interruptIdleWorkers(ONLY_ONE);
-                    return;
+                    return false;
                 }
 
                 mainLock.lock();
@@ -528,7 +530,7 @@ namespace concurrent{
                         ctl.set(ctlOf(TERMINATED, 0));
                         termination->signalAll();
                         mainLock.unlock();
-                        return;
+                        return true;
                     }
                 } catch(Exception& ex) {
                     mainLock.unlock();
@@ -537,6 +539,8 @@ namespace concurrent{
                 mainLock.unlock();
                 // else retry on failed CAS
             }
+
+            return false;
         }
 
         /**
@@ -1133,6 +1137,7 @@ namespace concurrent{
                 mainLock.unlock();
                 throw;
             }
+
             mainLock.unlock();
 
             t->start();
@@ -1151,7 +1156,7 @@ namespace concurrent{
         /**
          * Performs cleanup and bookkeeping for a dying worker. Called only from
          * worker threads. Unless completedAbruptly is set, assumes that workerCount
-         * has already been adjusted to account for exit.  This method removes
+         * has not already been adjusted to account for exit.  This method removes
          * thread from worker set, and possibly terminates the pool or replaces the
          * worker if either it exited due to user task exception or if fewer than
          * corePoolSize workers are running or queue is non-empty but there are no
@@ -1162,11 +1167,7 @@ namespace concurrent{
          * @param completedAbruptly
          *      Indicates if the worker died due to user exception.
          */
-        void processWorkerExit(Worker* w, bool completedAbruptly) {
-
-            if (completedAbruptly) { // If abrupt, then workerCount wasn't adjusted
-                decrementWorkerCount();
-            }
+        void processWorkerExit(Worker* w, bool completedAbruptly DECAF_UNUSED) {
 
             mainLock.lock();
             try {
@@ -1175,9 +1176,12 @@ namespace concurrent{
                 this->deadWorkers.add(w);
             } catch(...) {
             }
+            decrementWorkerCount();
             mainLock.unlock();
 
-            tryTerminate();
+            if (tryTerminate()) {
+                return;
+            }
 
             int c = ctl.get();
             if (runStateLessThan(c, STOP)) {
@@ -1208,7 +1212,7 @@ namespace concurrent{
          *     both before and after the timed wait.
          *
          * @return task, or NULL if the worker must exit, in which case
-         *         workerCount is decremented
+         *         workerCount is decremented when the task completes.
          */
         Runnable* getTask() {
             bool timedOut = false; // Did the last poll() time out?
@@ -1220,7 +1224,6 @@ namespace concurrent{
 
                 // Check if queue empty only if necessary.
                 if (rs >= SHUTDOWN && (rs >= STOP || workQueue->isEmpty()))
{
-                    decrementWorkerCount();
                     return NULL;
                 }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/lang/PointerTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/lang/PointerTest.cpp?rev=1369198&r1=1369197&r2=1369198&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/lang/PointerTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/lang/PointerTest.cpp Fri Aug 
3 20:33:18 2012
@@ -25,6 +25,7 @@
 
 #include <map>
 #include <string>
+#include <vector>
 
 using namespace std;
 using namespace decaf;
@@ -34,20 +35,30 @@ using namespace decaf::util::concurrent;
 
 ////////////////////////////////////////////////////////////////////////////////
 class TestClassBase {
+protected:
+
+    std::vector<int> content;
+
 public:
 
     virtual ~TestClassBase(){}
 
     virtual std::string returnHello() = 0;
 
+    int getSize() const {
+        return content.size();
+    }
 };
 
 ////////////////////////////////////////////////////////////////////////////////
 class TestClassA : public TestClassBase {
 public:
 
+    TestClassA() : TestClassBase() {
+        this->content.resize(1);
+    }
+
     virtual ~TestClassA() {
-        //std::cout << std::endl << "TestClassA - Destructor" << std::endl;
     }
 
     std::string returnHello() {
@@ -60,8 +71,11 @@ public:
 class TestClassB : public TestClassBase {
 public:
 
+    TestClassB() : TestClassBase() {
+        this->content.resize(2);
+    }
+
     virtual ~TestClassB() {
-        //std::cout << std::endl << "TestClassB - Destructor" << std::endl;
     }
 
     std::string returnHello() {
@@ -371,51 +385,6 @@ void PointerTest::testSTLContainers() {
     CPPUNIT_ASSERT( *( testMap2.rbegin()->first ) == 3 );
 }
 
-//////////////////////////////////////////////////////////////////////////////////
-//class SelfCounting {
-//private:
-//
-//    int refCount;
-//
-//public:
-//
-//    SelfCounting() : refCount( 0 ) {}
-//    SelfCounting( const SelfCounting& other ) : refCount( other.refCount ) {}
-//
-//    void addReference() { this->refCount++; }
-//    bool releaseReference() { return !( --this->refCount ); }
-//
-//    std::string returnHello() { return "Hello"; }
-//};
-//
-//////////////////////////////////////////////////////////////////////////////////
-//void PointerTest::testInvasive() {
-//
-//    Pointer< SelfCounting, InvasiveCounter<SelfCounting> > thePointer( new
SelfCounting );
-//
-//    // Test Null Initialize
-//    Pointer< SelfCounting, InvasiveCounter<SelfCounting> > nullPointer;
-//    CPPUNIT_ASSERT( nullPointer.get() == NULL );
-//
-//    // Test Value Constructor
-//    Pointer< SelfCounting, InvasiveCounter<SelfCounting> > pointer( thePointer
);
-//    CPPUNIT_ASSERT( pointer.get() == thePointer );
-//
-//    // Test Copy Constructor
-//    Pointer< SelfCounting, InvasiveCounter<SelfCounting> > ctorCopy( pointer
);
-//    CPPUNIT_ASSERT( ctorCopy.get() == thePointer );
-//
-//    // Test Assignment
-//    Pointer< SelfCounting, InvasiveCounter<SelfCounting> > copy = pointer;
-//    CPPUNIT_ASSERT( copy.get() == thePointer );
-//
-//    CPPUNIT_ASSERT( ( *pointer ).returnHello() == "Hello" );
-//    CPPUNIT_ASSERT( pointer->returnHello() == "Hello" );
-//
-//    copy.reset( NULL );
-//    CPPUNIT_ASSERT( copy.get() == NULL );
-//}
-
 ////////////////////////////////////////////////////////////////////////////////
 TestClassBase* methodReturnRawPointer() {
 
@@ -444,11 +413,13 @@ void PointerTest::testDynamicCast() {
     CPPUNIT_ASSERT_NO_THROW(
         ptrTestClassA = pointer1.dynamicCast<TestClassA>() );
     CPPUNIT_ASSERT( ptrTestClassA != NULL );
+    CPPUNIT_ASSERT( ptrTestClassA->getSize() == 1 );
 
     Pointer<TestClassB> ptrTestClassB;
     CPPUNIT_ASSERT_NO_THROW(
         ptrTestClassB = pointer2.dynamicCast<TestClassB>() );
     CPPUNIT_ASSERT( ptrTestClassB != NULL );
+    CPPUNIT_ASSERT( ptrTestClassB->getSize() == 2 );
 
     Pointer<TestClassA> ptrTestClassA2;
     CPPUNIT_ASSERT_THROW_MESSAGE(
@@ -461,14 +432,20 @@ void PointerTest::testDynamicCast() {
         "Should Throw a ClassCastException",
         ptrTestClassA2 = nullPointer.dynamicCast<TestClassA>(),
         ClassCastException );
+
+    Pointer<TestClassBase> basePointer = ptrTestClassA.dynamicCast<TestClassBase>();
+    CPPUNIT_ASSERT( basePointer->getSize() == 1 );
+
+    basePointer = ptrTestClassB.dynamicCast<TestClassBase>();
+    CPPUNIT_ASSERT( basePointer->getSize() == 2 );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 class Gate {
 private:
 
-    CountDownLatch * enter_latch;
-    CountDownLatch * leave_latch;
+    CountDownLatch * enterLatch;
+    CountDownLatch * leaveLatch;
     Mutex mutex;
     bool closed;
 
@@ -478,8 +455,8 @@ public:
     virtual ~Gate() {}
 
     void open( int count ) {
-        leave_latch = new CountDownLatch( count );
-        enter_latch = new CountDownLatch( count );
+        leaveLatch = new CountDownLatch( count );
+        enterLatch = new CountDownLatch( count );
         mutex.lock();
         closed = false;
         mutex.notifyAll();
@@ -490,21 +467,21 @@ public:
         mutex.lock();
         while( closed )
             mutex.wait();
-        enter_latch->countDown();
-        if (enter_latch->getCount() == 0) {
+        enterLatch->countDown();
+        if (enterLatch->getCount() == 0) {
             closed = true;
         }
         mutex.unlock();
     }
 
     void leave() {
-        leave_latch->countDown();
+        leaveLatch->countDown();
     }
 
     void close() {
-        leave_latch->await();
-        delete leave_latch;
-        delete enter_latch;
+        leaveLatch->await();
+        delete leaveLatch;
+        delete enterLatch;
     }
 };
 
@@ -512,23 +489,23 @@ public:
 class PointerTestThread: public Thread {
 private:
 
-    Gate *_gate;
-    Pointer<std::string> _s;
+    Gate *gate;
+    Pointer<std::string> s;
 
 public:
 
-    PointerTestThread( Gate *gate ) : _gate( gate ) {}
+    PointerTestThread( Gate *gate ) : gate( gate ) {}
     virtual ~PointerTestThread() {}
 
     void setString( Pointer<std::string> s ) {
-        _s = s;
+        this->s = s;
     }
 
     virtual void run() {
         for( int j = 0; j < 1000; j++ ) {
-            _gate->enter();
-            _s.reset( NULL );
-            _gate->leave();
+            gate->enter();
+            s.reset( NULL );
+            gate->leave();
         }
     }
 };



Mime
View raw message