Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 89383 invoked from network); 6 Apr 2011 19:26:58 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 6 Apr 2011 19:26:58 -0000 Received: (qmail 14775 invoked by uid 500); 6 Apr 2011 19:26:58 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 14739 invoked by uid 500); 6 Apr 2011 19:26:57 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 14732 invoked by uid 99); 6 Apr 2011 19:26:57 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Apr 2011 19:26:57 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Apr 2011 19:26:54 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id EA50C23888EA; Wed, 6 Apr 2011 19:26:32 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1089595 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/decaf/util/concurrent/ test/decaf/util/concurrent/ Date: Wed, 06 Apr 2011 19:26:32 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110406192632.EA50C23888EA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Wed Apr 6 19:26:32 2011 New Revision: 1089595 URL: http://svn.apache.org/viewvc?rev=1089595&view=rev Log: Additional improvements to ThreadPoolExecutor and added tests. Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp?rev=1089595&r1=1089594&r2=1089595&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp Wed Apr 6 19:26:32 2011 @@ -49,6 +49,8 @@ namespace concurrent{ class ExecutorKernel { public: + ThreadPoolExecutor* parent; + LinkedList workers; LinkedList deadWorkers; Timer cleanupTimer; @@ -65,9 +67,13 @@ namespace concurrent{ Mutex mainLock; CountDownLatch termination; + long long completedTasks; + int largestPoolSize; + public: - ExecutorKernel(int corePoolSize, int maxPoolSize, long long keepAliveTime, + ExecutorKernel(ThreadPoolExecutor* parent, + int corePoolSize, int maxPoolSize, long long keepAliveTime, BlockingQueue* workQueue); ~ExecutorKernel(); @@ -92,6 +98,8 @@ namespace concurrent{ void handleWorkerExit(Worker* worker); + void tryTerminate(); + }; class Worker : public lang::Thread { @@ -208,7 +216,7 @@ namespace concurrent{ ThreadPoolExecutor::ThreadPoolExecutor(int corePoolSize, int maxPoolSize, long long keepAliveTime, const TimeUnit& unit, BlockingQueue* workQueue) : - kernel(new ExecutorKernel(corePoolSize, maxPoolSize, unit.toMillis(keepAliveTime), workQueue)) { + kernel(new ExecutorKernel(this, corePoolSize, maxPoolSize, unit.toMillis(keepAliveTime), workQueue)) { } //////////////////////////////////////////////////////////////////////////////// @@ -260,7 +268,12 @@ bool ThreadPoolExecutor::awaitTerminatio //////////////////////////////////////////////////////////////////////////////// int ThreadPoolExecutor::getPoolSize() const { - return (int)this->kernel->workers.size(); + int result = 0; + synchronized(&this->kernel->mainLock) { + result = this->kernel->workers.size(); + } + + return result; } //////////////////////////////////////////////////////////////////////////////// @@ -279,6 +292,39 @@ long long ThreadPoolExecutor::getTaskCou } //////////////////////////////////////////////////////////////////////////////// +int ThreadPoolExecutor::getActiveCount() const { + + int result = 0; + synchronized(&this->kernel->mainLock) { + if(!this->kernel->terminated.get()) { + result = this->kernel->workers.size() - this->kernel->freeThreads.get(); + } + } + + return result; +} + +//////////////////////////////////////////////////////////////////////////////// +long long ThreadPoolExecutor::getCompletedTaskCount() const { + long long result = 0; + synchronized(&this->kernel->mainLock) { + result = this->kernel->completedTasks; + } + + return result; +} + +//////////////////////////////////////////////////////////////////////////////// +int ThreadPoolExecutor::getLargestPoolSize() const { + int result = 0; + synchronized(&this->kernel->mainLock) { + result = this->kernel->largestPoolSize; + } + + return result; +} + +//////////////////////////////////////////////////////////////////////////////// bool ThreadPoolExecutor::isShutdown() const { return this->kernel->stopped.get(); } @@ -293,8 +339,10 @@ void ThreadPoolExecutor::terminated() { } //////////////////////////////////////////////////////////////////////////////// -ExecutorKernel::ExecutorKernel(int corePoolSize, int maxPoolSize, long long keepAliveTime, +ExecutorKernel::ExecutorKernel(ThreadPoolExecutor* parent, int corePoolSize, + int maxPoolSize, long long keepAliveTime, BlockingQueue* workQueue) : + parent(parent), workers(), deadWorkers(), cleanupTimer(), @@ -307,7 +355,9 @@ ExecutorKernel::ExecutorKernel(int coreP keepAliveTime(keepAliveTime), workQueue(workQueue), mainLock(), - termination(1) { + termination(1), + completedTasks(0), + largestPoolSize(0) { if(corePoolSize < 0 || maxPoolSize <= 0 || maxPoolSize < corePoolSize || keepAliveTime < 0) { @@ -326,7 +376,9 @@ ExecutorKernel::ExecutorKernel(int coreP //////////////////////////////////////////////////////////////////////////////// ExecutorKernel::~ExecutorKernel() { try{ + this->shutdown(); + this->tryTerminate(); this->termination.await(); @@ -376,6 +428,7 @@ void ExecutorKernel::onTaskCompleted(Wor try { synchronized(&mainLock) { freeThreads.incrementAndGet(); + completedTasks++; } } DECAF_CATCH_RETHROW( lang::Exception ) @@ -400,9 +453,7 @@ void ExecutorKernel::handleWorkerExit(Wo this->deadWorkers.add(worker); if(this->workers.isEmpty()) { - - // TODO - Notify ThreadPoolExecutor to call terminated() - + this->parent->terminated(); this->terminated = true; this->termination.countDown(); } @@ -485,6 +536,7 @@ void ExecutorKernel::AllocateThread() { this->workers.add(newWorker); freeThreads.incrementAndGet(); newWorker->start(); + this->largestPoolSize++; } } DECAF_CATCH_RETHROW( lang::Exception ) @@ -537,3 +589,25 @@ bool ExecutorKernel::awaitTermination(lo return this->termination.await(timeout, unit); } + +//////////////////////////////////////////////////////////////////////////////// +void ExecutorKernel::tryTerminate() { + + if (!this->isStoppedOrStopping() || (this->isStoppedOrStopping() && !this->workQueue->isEmpty())) { + return; + } + + if (this->workers.size() > 0) { + // TODO - Once they are interruptible wake a worker. + return; + } + + synchronized(&this->mainLock) { + try { + this->parent->terminated(); + } catch(...) {} + + this->terminated.set(true); + this->termination.countDown(); + } +} Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h?rev=1089595&r1=1089594&r2=1089595&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.h Wed Apr 6 19:26:32 2011 @@ -58,6 +58,7 @@ namespace concurrent{ private: + friend class ExecutorKernel; ExecutorKernel* kernel; public: @@ -165,6 +166,30 @@ namespace concurrent{ virtual long long getTaskCount() const; /** + * Returns an approximation of the number of threads that are currently running + * tasks for this executor. This value can change rapidly. + * + * @return the number of currently active threads. + */ + virtual int getActiveCount() const; + + /** + * Returns the approximate number of Tasks that have been completed by this + * Executor, this value never decreases. + * + * @return the number of completed tasks since creation of the Executor. + */ + virtual long long getCompletedTaskCount() const; + + /** + * Returns the most Threads that have ever been active at one time within this + * Executors Thread pool. + * + * @return the largest number of threads ever to coexist in this executor. + */ + virtual int getLargestPoolSize() const; + + /** * Returns whether this executor has been shutdown or not. * * @return true if this executor has been shutdown. Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp?rev=1089595&r1=1089594&r2=1089595&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.cpp Wed Apr 6 19:26:32 2011 @@ -95,6 +95,17 @@ namespace { } /////////////////////////////////////////////////////////////////////////////// +void ThreadPoolExecutorTest::testConstructor1() { + + ThreadPoolExecutor pool(1, 3, 5, TimeUnit::SECONDS, new LinkedBlockingQueue()); + + CPPUNIT_ASSERT_EQUAL(1, pool.getCorePoolSize()); + CPPUNIT_ASSERT_EQUAL(3, pool.getMaximumPoolSize()); + CPPUNIT_ASSERT_EQUAL(false, pool.isShutdown()); + CPPUNIT_ASSERT_EQUAL(false, pool.isTerminated()); +} + +/////////////////////////////////////////////////////////////////////////////// void ThreadPoolExecutorTest::testSimpleTasks() { CountDownLatch myLatch( 3 ); Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h?rev=1089595&r1=1089594&r2=1089595&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/ThreadPoolExecutorTest.h Wed Apr 6 19:26:32 2011 @@ -36,6 +36,7 @@ namespace concurrent{ private: CPPUNIT_TEST_SUITE( ThreadPoolExecutorTest ); + CPPUNIT_TEST( testConstructor1 ); CPPUNIT_TEST( testSimpleTasks ); CPPUNIT_TEST( testMoreTasksThanMaxPoolSize ); CPPUNIT_TEST( testTasksThatThrow ); @@ -51,6 +52,7 @@ namespace concurrent{ ThreadPoolExecutorTest() {} virtual ~ThreadPoolExecutorTest() {} + void testConstructor1(); void testSimpleTasks(); void testMoreTasksThanMaxPoolSize(); void testTasksThatThrow();