activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1293724 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent: AbstractExecutorService.cpp AbstractExecutorService.h ExecutorService.h ThreadPoolExecutor.cpp locks/LockSupport.cpp
Date Sat, 25 Feb 2012 23:38:34 GMT
Author: tabish
Date: Sat Feb 25 23:38:33 2012
New Revision: 1293724

URL: http://svn.apache.org/viewvc?rev=1293724&view=rev
Log:
Bit more work on Executors.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/LockSupport.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp?rev=1293724&r1=1293723&r2=1293724&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.cpp
Sat Feb 25 23:38:33 2012
@@ -21,6 +21,7 @@ using namespace decaf;
 using namespace decaf::util;
 using namespace decaf::util::concurrent;
 using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
 AbstractExecutorService::AbstractExecutorService() : ExecutorService() {
@@ -29,3 +30,9 @@ AbstractExecutorService::AbstractExecuto
 ////////////////////////////////////////////////////////////////////////////////
 AbstractExecutorService::~AbstractExecutorService() {
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractExecutorService::doSubmit(FutureType* future) {
+
+    throw UnsupportedOperationException();
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h?rev=1293724&r1=1293723&r2=1293724&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/AbstractExecutorService.h
Sat Feb 25 23:38:33 2012
@@ -40,6 +40,10 @@ namespace concurrent {
         AbstractExecutorService();
         virtual ~AbstractExecutorService();
 
+    protected:
+
+        virtual void doSubmit(FutureType* future);
+
     };
 
 }}}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h?rev=1293724&r1=1293723&r2=1293724&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ExecutorService.h
Sat Feb 25 23:38:33 2012
@@ -22,6 +22,7 @@
 
 #include <decaf/lang/Runnable.h>
 #include <decaf/util/ArrayList.h>
+#include <decaf/util/concurrent/Future.h>
 #include <decaf/util/concurrent/Executor.h>
 #include <decaf/util/concurrent/TimeUnit.h>
 #include <decaf/lang/exceptions/InterruptedException.h>
@@ -106,6 +107,33 @@ namespace concurrent {
          */
         virtual bool isTerminated() const = 0;
 
+        /**
+         * Submits a Runnable object for execution.  A Future object is created and returned
+         * that will return the default value of the template type upon completion.
+         *
+         * @param task
+         *      Pointer to a Runnable object that will be executed by this ExecutorService.
+         *
+         * @returns a new Future<?> pointer that is owned by the caller.
+         *
+         * @throws NullPointerException if the Runnable pointer passed is NULL.
+         */
+        template<typename E>
+        Future<E>* submit(decaf::lang::Runnable* task) {
+            return NULL;
+        }
+
+    protected:
+
+        /**
+         * Perform the actual submit of a FutureType instance, the caller is responsible
for
+         * creating the properly typed Future<E> object and returning that to its caller.
+         *
+         * @param future
+         *      Pointer to a base FutureType instance that is to be submitted to the Executor.
+         */
+        virtual void doSubmit(FutureType* future) = 0;
+
     };
 
 }}}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp?rev=1293724&r1=1293723&r2=1293724&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadPoolExecutor.cpp
Sat Feb 25 23:38:33 2012
@@ -25,6 +25,7 @@
 #include <decaf/util/concurrent/locks/ReentrantLock.h>
 #include <decaf/util/concurrent/locks/AbstractQueuedSynchronizer.h>
 #include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/util/concurrent/ConcurrentStlMap.h>
 #include <decaf/util/concurrent/atomic/AtomicInteger.h>
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/util/concurrent/RejectedExecutionException.h>
@@ -53,6 +54,30 @@ namespace decaf{
 namespace util{
 namespace concurrent{
 
+    using decaf::lang::Pointer;
+
+    /**
+     * Any task that we don't own we wrap in this Runnable object so that the
+     * task deletion logic can remain unchanged and thread safe.
+     */
+    class UnownedTaskWrapper : public Runnable {
+    private:
+
+        Runnable* task;
+
+    public:
+
+        UnownedTaskWrapper(Runnable* task) : Runnable(), task(task) {
+        }
+
+        virtual ~UnownedTaskWrapper() {
+        }
+
+        virtual void run() {
+            this->task->run();
+        }
+    };
+
     /**
      * The main pool control state, ctl, is an atomic integer packing
      * two conceptual fields
@@ -352,12 +377,19 @@ namespace concurrent{
 
                 // Ensure dead Worker Threads are destroyed, the Timer might not have
                 // run recently.
-                Pointer< Iterator<Worker*> > iter(this->deadWorkers.iterator());
-                while(iter->hasNext()) {
-                    Worker* worker = iter->next();
+                Pointer< Iterator<Worker*> > workers(this->deadWorkers.iterator());
+                while(workers->hasNext()) {
+                    Worker* worker = workers->next();
                     worker->thread->join();
                     delete worker;
                 }
+
+                Pointer< Iterator<Runnable*> > tasks(this->workQueue->iterator());
+                while(tasks->hasNext()) {
+                    delete tasks->next();
+                }
+
+                this->workQueue->clear();
             }
             DECAF_CATCH_NOTHROW(Exception)
             DECAF_CATCHALL_NOTHROW()
@@ -688,12 +720,22 @@ namespace concurrent{
             processWorkerExit(w, completedAbruptly);
         }
 
-        void execute(Runnable* task, bool takeOwnership DECAF_UNUSED) {
+        void execute(Runnable* task, bool takeOwnership) {
 
             if (task == NULL) {
                 throw NullPointerException(__FILE__, __LINE__, "Runnable task cannot be NULL");
             }
 
+            Runnable* target = task;
+
+            /**
+             * If we don't own it then wrap it so that our deletion logic is
+             * still valid.
+             */
+            if (!takeOwnership) {
+                target = new UnownedTaskWrapper(task);
+            }
+
             /*
              * Proceed in 3 steps:
              *
@@ -716,21 +758,21 @@ namespace concurrent{
              */
             int c = ctl.get();
             if (workerCountOf(c) < corePoolSize) {
-                if (addWorker(task, true)) {
+                if (addWorker(target, true)) {
                     return;
                 }
                 c = ctl.get();
             }
 
-            if (isRunning(c) && workQueue->offer(task)) {
+            if (isRunning(c) && workQueue->offer(target)) {
                 int recheck = ctl.get();
-                if (!isRunning(recheck) && this->remove(task)) {
-                    this->rejectionHandler->rejectedExecution(task, this->parent);
+                if (!isRunning(recheck) && this->remove(target)) {
+                    this->rejectionHandler->rejectedExecution(target, this->parent);
                 } else if (workerCountOf(recheck) == 0) {
                     addWorker(NULL, false);
                 }
-            } else if (!addWorker(task, false)) {
-                this->rejectionHandler->rejectedExecution(task, this->parent);
+            } else if (!addWorker(target, false)) {
+                this->rejectionHandler->rejectedExecution(target, this->parent);
             }
         }
 
@@ -935,9 +977,9 @@ namespace concurrent{
         }
 
         bool remove(Runnable* task) {
-            bool removed = this->workQueue->remove(task);
+            bool result = this->workQueue->remove(task);
             this->tryTerminate();
-            return removed;
+            return result;
         }
 
     private:

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/LockSupport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/LockSupport.cpp?rev=1293724&r1=1293723&r2=1293724&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/LockSupport.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/LockSupport.cpp
Sat Feb 25 23:38:33 2012
@@ -41,24 +41,25 @@ LockSupport::~LockSupport() {
 ////////////////////////////////////////////////////////////////////////////////
 void LockSupport::unpark( decaf::lang::Thread* thread ) throw() {
 
-    try{
-        Threading::unpark( thread );
-    } DECAF_CATCHALL_NOTHROW()
+    try {
+		Threading::unpark(thread);
+	}
+    DECAF_CATCHALL_NOTHROW()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void LockSupport::park() throw() {
 
-    try{
-        Threading::park( Thread::currentThread() );
-    } DECAF_CATCHALL_NOTHROW()
+    try {
+		Threading::park(Thread::currentThread());
+	}
+    DECAF_CATCHALL_NOTHROW()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void LockSupport::parkNanos( long long nanos ) throw() {
 
-    try{
-
+    try {
         long long mills = 0;
 
         if( nanos >= 1000000 ) {
@@ -67,8 +68,8 @@ void LockSupport::parkNanos( long long n
         }
 
         Threading::park(Thread::currentThread(), mills, (int)nanos);
-
-    } DECAF_CATCHALL_NOTHROW()
+    }
+    DECAF_CATCHALL_NOTHROW()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -83,6 +84,6 @@ void LockSupport::parkUntil( long long d
         }
 
         Threading::park(Thread::currentThread(), ( deadline - now ), 0);
-
-    } DECAF_CATCHALL_NOTHROW()
+    }
+    DECAF_CATCHALL_NOTHROW()
 }



Mime
View raw message