activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r419365 [2/25] - in /incubator/activemq/trunk: activemq-core/src/main/java/org/apache/activemq/thread/ activemq-core/src/test/java/org/apache/activemq/openwire/v1/ activemq-cpp/src/main/activemq/concurrent/ activemq-cpp/src/main/activemq/co...
Date Wed, 05 Jul 2006 22:27:47 GMT
Modified: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThread.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThread.h?rev=419365&r1=419364&r2=419365&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThread.h (original)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThread.h Wed Jul  5 15:27:34 2006
@@ -1,105 +1,105 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef _ACTIVEMQ_CONCURRENT_POOLEDTHREAD_H_
-#define _ACTIVEMQ_CONCURRENT_POOLEDTHREAD_H_
-
-#include <activemq/concurrent/Thread.h>
-#include <activemq/concurrent/Runnable.h>
-#include <activemq/concurrent/PooledThreadListener.h>
-#include <activemq/logger/LoggerDefines.h>
-
-#include <cms/Stoppable.h>
-#include <cms/CMSException.h>
-
-namespace activemq{
-namespace concurrent{
-
-   class ThreadPool;
-
-   class PooledThread : public Thread, public cms::Stoppable
-   {
-   private:
-   
-      // Is this thread currently processing something
-      bool busy;
-      
-      // Boolean flag indicating thread should stop
-      bool done;
-      
-      // Listener for Task related events
-      PooledThreadListener* listener;
-      
-      // The thread pool this Pooled Thread is Servicing
-      ThreadPool* pool;
-
-      // Logger Init
-      LOGCMS_DECLARE(logger);
-      
-   public:
-   
-      /**
-       * Constructor
-       */
-   	PooledThread(ThreadPool* pool);
-
-      /**
-       * Destructor
-       */
-   	virtual ~PooledThread(void);
-
-      /**
-       * Run Method for this object waits for something to be
-       * enqueued on the ThreadPool and then grabs it and calls 
-       * its run method.
-       */
-      virtual void run(void);
-      
-      /**
-       * Stops the Thread, thread will complete its task if currently
-       * running one, and then die.  Does not block.
-       */
-      virtual void stop(void) throw ( cms::CMSException );
-      
-      /**
-       * Checks to see if the thread is busy, if busy it means
-       * that this thread has taken a task from the ThreadPool's
-       * queue and is processing it.
-       */
-      virtual bool isBusy(void) { return busy; }
-      
-      /**
-       * Adds a listener to this <code>PooledThread</code> to be
-       * notified when this thread starts and completes a task.
-       */
-      virtual void setPooledThreadListener(PooledThreadListener* listener)
-      {
-         this->listener = listener;
-      }
-
-      /**
-       * Removes a listener for this <code>PooledThread</code> to be
-       * notified when this thread starts and completes a task.
-       */
-      virtual PooledThreadListener* getPooledThreadListener(void)
-      {
-         return this->listener;
-      }
-   };
-
-}}
-
-#endif /*_ACTIVEMQ_CONCURRENT_POOLEDTHREAD_H_*/
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONCURRENT_POOLEDTHREAD_H_
+#define _ACTIVEMQ_CONCURRENT_POOLEDTHREAD_H_
+
+#include <activemq/concurrent/Thread.h>
+#include <activemq/concurrent/Runnable.h>
+#include <activemq/concurrent/PooledThreadListener.h>
+#include <activemq/logger/LoggerDefines.h>
+
+#include <cms/Stoppable.h>
+#include <cms/CMSException.h>
+
+namespace activemq{
+namespace concurrent{
+
+   class ThreadPool;
+
+   class PooledThread : public Thread, public cms::Stoppable
+   {
+   private:
+   
+      // Is this thread currently processing something
+      bool busy;
+      
+      // Boolean flag indicating thread should stop
+      bool done;
+      
+      // Listener for Task related events
+      PooledThreadListener* listener;
+      
+      // The thread pool this Pooled Thread is Servicing
+      ThreadPool* pool;
+
+      // Logger Init
+      LOGCMS_DECLARE(logger);
+      
+   public:
+   
+      /**
+       * Constructor
+       */
+   	PooledThread(ThreadPool* pool);
+
+      /**
+       * Destructor
+       */
+   	virtual ~PooledThread(void);
+
+      /**
+       * Run Method for this object waits for something to be
+       * enqueued on the ThreadPool and then grabs it and calls 
+       * its run method.
+       */
+      virtual void run(void);
+      
+      /**
+       * Stops the Thread, thread will complete its task if currently
+       * running one, and then die.  Does not block.
+       */
+      virtual void stop(void) throw ( cms::CMSException );
+      
+      /**
+       * Checks to see if the thread is busy, if busy it means
+       * that this thread has taken a task from the ThreadPool's
+       * queue and is processing it.
+       */
+      virtual bool isBusy(void) { return busy; }
+      
+      /**
+       * Adds a listener to this <code>PooledThread</code> to be
+       * notified when this thread starts and completes a task.
+       */
+      virtual void setPooledThreadListener(PooledThreadListener* listener)
+      {
+         this->listener = listener;
+      }
+
+      /**
+       * Removes a listener for this <code>PooledThread</code> to be
+       * notified when this thread starts and completes a task.
+       */
+      virtual PooledThreadListener* getPooledThreadListener(void)
+      {
+         return this->listener;
+      }
+   };
+
+}}
+
+#endif /*_ACTIVEMQ_CONCURRENT_POOLEDTHREAD_H_*/

Propchange: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThread.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThreadListener.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThreadListener.h?rev=419365&r1=419364&r2=419365&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThreadListener.h (original)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThreadListener.h Wed Jul  5 15:27:34 2006
@@ -1,66 +1,66 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef _ACTIVEMQ_CONCURRENT_POOLEDTHREADLISTENER_H_
-#define _ACTIVEMQ_CONCURRENT_POOLEDTHREADLISTENER_H_
-
-#include <activemq/exceptions/ActiveMQException.h>
-
-namespace activemq{
-namespace concurrent{
-
-   //forward declare
-   class PooledThread;
-
-   class PooledThreadListener
-   {
-   public:
-
-      /**
-       * Destructor
-       */
-   	virtual ~PooledThreadListener(void) {}
-      
-      /**
-       * Called by a pooled thread when it is about to begin
-       * executing a new task.
-       * @param Pointer to the Pooled Thread that is making this call
-       */
-      virtual void onTaskStarted(PooledThread* thread) = 0;
-       
-      /**
-       * Called by a pooled thread when it has completed a task
-       * and is going back to waiting for another task to run
-       * @param Pointer the the Pooled Thread that is making this call.
-       */
-      virtual void onTaskCompleted(PooledThread* thread) = 0;
-      
-      /**
-       * Called by a pooled thread when it has encountered an exception
-       * while running a user task, after receiving this notification
-       * the callee should assume that the PooledThread is now no longer
-       * running.
-       * @param Pointer to the Pooled Thread that is making this call
-       * @param The Exception that occured.
-       */
-      virtual void onTaskException(PooledThread* thread, 
-                                   exceptions::ActiveMQException& ex) = 0;
-       
-   };
-
-}}
-
-#endif /*_ACTIVEMQ_CONCURRENT_POOLEDTHREADLISTENER_H_*/
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONCURRENT_POOLEDTHREADLISTENER_H_
+#define _ACTIVEMQ_CONCURRENT_POOLEDTHREADLISTENER_H_
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace concurrent{
+
+   //forward declare
+   class PooledThread;
+
+   class PooledThreadListener
+   {
+   public:
+
+      /**
+       * Destructor
+       */
+   	virtual ~PooledThreadListener(void) {}
+      
+      /**
+       * Called by a pooled thread when it is about to begin
+       * executing a new task.
+       * @param Pointer to the Pooled Thread that is making this call
+       */
+      virtual void onTaskStarted(PooledThread* thread) = 0;
+       
+      /**
+       * Called by a pooled thread when it has completed a task
+       * and is going back to waiting for another task to run
+       * @param Pointer the the Pooled Thread that is making this call.
+       */
+      virtual void onTaskCompleted(PooledThread* thread) = 0;
+      
+      /**
+       * Called by a pooled thread when it has encountered an exception
+       * while running a user task, after receiving this notification
+       * the callee should assume that the PooledThread is now no longer
+       * running.
+       * @param Pointer to the Pooled Thread that is making this call
+       * @param The Exception that occured.
+       */
+      virtual void onTaskException(PooledThread* thread, 
+                                   exceptions::ActiveMQException& ex) = 0;
+       
+   };
+
+}}
+
+#endif /*_ACTIVEMQ_CONCURRENT_POOLEDTHREADLISTENER_H_*/

Propchange: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/PooledThreadListener.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Runnable.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Synchronizable.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/TaskListener.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/TaskListener.h?rev=419365&r1=419364&r2=419365&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/TaskListener.h (original)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/TaskListener.h Wed Jul  5 15:27:34 2006
@@ -1,56 +1,56 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef _ACTIVEMQ_CONCURRENT_TASKLISTENER_H_
-#define _ACTIVEMQ_CONCURRENT_TASKLISTENER_H_
-
-#include <activemq/concurrent/Runnable.h>
-#include <activemq/exceptions/ActiveMQException.h>
-
-namespace activemq{
-namespace concurrent{
-
-class TaskListener
-{
-public:
-
-   /**
-    * Destructor
-    */
-	virtual ~TaskListener() {}
-
-   /**
-    * Called when a queued task has completed, the task that
-    * finished is passed along for user consumption
-    * @param Runnable Pointer to the task that finished
-    */
-   virtual void onTaskComplete(Runnable* task) = 0;
-   
-   /**
-    * Called when a queued task has thrown an exception while
-    * being run.  The Callee should assume that this was an 
-    * unrecoverable exeption and that this task is now defunct.
-    * @param Runnable Pointer to the task
-    * @param The ActiveMQException that was thrown.
-    */
-   virtual void onTaskException(Runnable* task, 
-                                exceptions::ActiveMQException& ex) = 0;
-   
-};
-
-}}
-
-#endif /*_ACTIVEMQ_CONCURRENT_TASKLISTENER_H_*/
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONCURRENT_TASKLISTENER_H_
+#define _ACTIVEMQ_CONCURRENT_TASKLISTENER_H_
+
+#include <activemq/concurrent/Runnable.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace concurrent{
+
+class TaskListener
+{
+public:
+
+   /**
+    * Destructor
+    */
+	virtual ~TaskListener() {}
+
+   /**
+    * Called when a queued task has completed, the task that
+    * finished is passed along for user consumption
+    * @param Runnable Pointer to the task that finished
+    */
+   virtual void onTaskComplete(Runnable* task) = 0;
+   
+   /**
+    * Called when a queued task has thrown an exception while
+    * being run.  The Callee should assume that this was an 
+    * unrecoverable exeption and that this task is now defunct.
+    * @param Runnable Pointer to the task
+    * @param The ActiveMQException that was thrown.
+    */
+   virtual void onTaskException(Runnable* task, 
+                                exceptions::ActiveMQException& ex) = 0;
+   
+};
+
+}}
+
+#endif /*_ACTIVEMQ_CONCURRENT_TASKLISTENER_H_*/

Propchange: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/TaskListener.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.cpp?rev=419365&r1=419364&r2=419365&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.cpp (original)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.cpp Wed Jul  5 15:27:34 2006
@@ -1,173 +1,173 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- 
-#include "Thread.h"
-#include <errno.h>
-
-#ifdef unix
-	#include <errno.h> // EINTR
-    extern int errno;
-#else
-	#include <process.h> // _endthreadex
-#endif
-
-#include <activemq/exceptions/ActiveMQException.h>
-
-using namespace activemq;
-using namespace activemq::concurrent;
-
-#ifdef unix
-static struct ThreadStaticInitializer {
-    // Thread Attribute member
-    pthread_attr_t threadAttribute;
-    // Static Initializer:
-    ThreadStaticInitializer() {
-        pthread_attr_init (&threadAttribute);
-        pthread_attr_setdetachstate (&threadAttribute, PTHREAD_CREATE_JOINABLE);
-    }
-} threadStaticInitializer;
-#endif
-
-////////////////////////////////////////////////////////////////////////////////
-Thread::Thread()
-{
-	task = this;
-	started = false;
-	joined = false;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-Thread::Thread( Runnable* task )
-{
-	this->task = task;
-	started = false;
-	joined = false;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-Thread::~Thread()
-{
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void Thread::start() throw ( exceptions::ActiveMQException )
-{
-    if (this->started) {
-        throw exceptions::ActiveMQException( __FILE__, __LINE__,
-            "Thread already started");
-    }
-    
-#ifdef unix
-	
-	pthread_attr_init (&attributes);
-    pthread_attr_setdetachstate (&attributes, PTHREAD_CREATE_JOINABLE);
-    int err = pthread_create (
-        &this->threadHandle,
-        &attributes,
-        runCallback,
-        this);
-    if (err != 0) {
-		throw exceptions::ActiveMQException( __FILE__, __LINE__,
-            "Coud not start thread");
-    }
-    
-#else
-
-    unsigned int threadId = 0;
-    this->threadHandle = 
-        (HANDLE)_beginthreadex(NULL, 0, runCallback, this, 0, &threadId);
-    if (this->threadHandle == NULL) {
-		throw exceptions::ActiveMQException( __FILE__, __LINE__,
-            "Coud not start thread");
-    }
-    
-#endif
-
-	// Mark the thread as started.
-    started = true;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void Thread::join() throw( exceptions::ActiveMQException )
-{
-    if (!this->started) {
-		throw exceptions::ActiveMQException( __FILE__, __LINE__,
-            "Thread::join() called without having called Thread::start()");
-    }
-    if (!this->joined) {
-    	
-#ifdef unix
-        pthread_join(this->threadHandle, NULL);
-#else
-        WaitForSingleObject (this->threadHandle, INFINITE);       
-#endif
-
-    }
-    this->joined = true;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void Thread::sleep(int millisecs)
-{
-#ifdef unix
-    struct timespec rec, rem;
-    rec.tv_sec = millisecs / 1000;
-    rec.tv_nsec = (millisecs % 1000) * 1000000;
-    while( nanosleep( &rec, &rem ) == -1 ){
-    	if( errno != EINTR ){
-    		break;
-    	}
-    }
-    
-#else
-    Sleep (millisecs);
-#endif
-}
-
-////////////////////////////////////////////////////////////////////////////////
-unsigned long Thread::getId(void)
-{
-   #ifdef unix
-      return (long)(pthread_self());
-   #else
-      return GetCurrentThreadId();
-   #endif
-}
-
-////////////////////////////////////////////////////////////////////////////////
-#ifdef unix
-void*
-#else
-unsigned int WINAPI
-#endif
-Thread::runCallback (void* param)
-{
-	// Get the instance.
-    Thread* thread = (Thread*)param;
-    
-    // Invoke run on the task.
-    thread->task->run();
-
-#ifdef unix
-    return NULL;
-#else
-    // Return 0 if no exception was threwn. Otherwise -1.
-    _endthreadex(0); // Needed when using threads and CRT in Windows. Otherwise memleak can appear.
-    return 0;
-#endif
-}
-
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#include "Thread.h"
+#include <errno.h>
+
+#ifdef unix
+	#include <errno.h> // EINTR
+    extern int errno;
+#else
+	#include <process.h> // _endthreadex
+#endif
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+using namespace activemq;
+using namespace activemq::concurrent;
+
+#ifdef unix
+static struct ThreadStaticInitializer {
+    // Thread Attribute member
+    pthread_attr_t threadAttribute;
+    // Static Initializer:
+    ThreadStaticInitializer() {
+        pthread_attr_init (&threadAttribute);
+        pthread_attr_setdetachstate (&threadAttribute, PTHREAD_CREATE_JOINABLE);
+    }
+} threadStaticInitializer;
+#endif
+
+////////////////////////////////////////////////////////////////////////////////
+Thread::Thread()
+{
+	task = this;
+	started = false;
+	joined = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Thread::Thread( Runnable* task )
+{
+	this->task = task;
+	started = false;
+	joined = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Thread::~Thread()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Thread::start() throw ( exceptions::ActiveMQException )
+{
+    if (this->started) {
+        throw exceptions::ActiveMQException( __FILE__, __LINE__,
+            "Thread already started");
+    }
+    
+#ifdef unix
+	
+	pthread_attr_init (&attributes);
+    pthread_attr_setdetachstate (&attributes, PTHREAD_CREATE_JOINABLE);
+    int err = pthread_create (
+        &this->threadHandle,
+        &attributes,
+        runCallback,
+        this);
+    if (err != 0) {
+		throw exceptions::ActiveMQException( __FILE__, __LINE__,
+            "Coud not start thread");
+    }
+    
+#else
+
+    unsigned int threadId = 0;
+    this->threadHandle = 
+        (HANDLE)_beginthreadex(NULL, 0, runCallback, this, 0, &threadId);
+    if (this->threadHandle == NULL) {
+		throw exceptions::ActiveMQException( __FILE__, __LINE__,
+            "Coud not start thread");
+    }
+    
+#endif
+
+	// Mark the thread as started.
+    started = true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Thread::join() throw( exceptions::ActiveMQException )
+{
+    if (!this->started) {
+		throw exceptions::ActiveMQException( __FILE__, __LINE__,
+            "Thread::join() called without having called Thread::start()");
+    }
+    if (!this->joined) {
+    	
+#ifdef unix
+        pthread_join(this->threadHandle, NULL);
+#else
+        WaitForSingleObject (this->threadHandle, INFINITE);       
+#endif
+
+    }
+    this->joined = true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Thread::sleep(int millisecs)
+{
+#ifdef unix
+    struct timespec rec, rem;
+    rec.tv_sec = millisecs / 1000;
+    rec.tv_nsec = (millisecs % 1000) * 1000000;
+    while( nanosleep( &rec, &rem ) == -1 ){
+    	if( errno != EINTR ){
+    		break;
+    	}
+    }
+    
+#else
+    Sleep (millisecs);
+#endif
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned long Thread::getId(void)
+{
+   #ifdef unix
+      return (long)(pthread_self());
+   #else
+      return GetCurrentThreadId();
+   #endif
+}
+
+////////////////////////////////////////////////////////////////////////////////
+#ifdef unix
+void*
+#else
+unsigned int WINAPI
+#endif
+Thread::runCallback (void* param)
+{
+	// Get the instance.
+    Thread* thread = (Thread*)param;
+    
+    // Invoke run on the task.
+    thread->task->run();
+
+#ifdef unix
+    return NULL;
+#else
+    // Return 0 if no exception was threwn. Otherwise -1.
+    _endthreadex(0); // Needed when using threads and CRT in Windows. Otherwise memleak can appear.
+    return 0;
+#endif
+}
+

Propchange: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.h?rev=419365&r1=419364&r2=419365&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.h (original)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.h Wed Jul  5 15:27:34 2006
@@ -1,131 +1,131 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef ACTIVEMQ_CONCURRENT_THREAD_H
-#define ACTIVEMQ_CONCURRENT_THREAD_H
-
-#include <activemq/exceptions/ActiveMQException.h>
-#include <activemq/concurrent/Runnable.h>
-#include <stdexcept>
-#include <assert.h>
-
-#if (defined(__unix__) || defined(unix) || defined(MACOSX) || defined(__APPLE__)) && !defined(USG)
-   
-   #ifndef unix
-      #define unix
-   #endif
-
-   #include <pthread.h>
-#else
-   #include <windows.h>
-#endif
-
-namespace activemq{
-namespace concurrent{
-   
-   /** 
-    * Basic thread class - mimics the Java Thread.  Derived classes may 
-    * implement the run method, or this class can be used as is with 
-    * a provided Runnable delegate.
-    */
-   class Thread : public Runnable
-   {
-   private:
-   
-      /**
-       * The task to be run by this thread, defaults to 
-       * this thread object.
-       */
-      Runnable* task;
-      
-   #ifdef unix
-      pthread_attr_t attributes;
-       pthread_t   threadHandle ;
-   #else
-       HANDLE      threadHandle ;
-   #endif
-   
-       /**
-        * Started state of this thread.
-        */
-       bool started;
-       
-       /**
-        * Indicates whether the thread has already been
-        * joined.
-        */
-       bool joined;
-       
-   public:
-      
-       Thread();
-       Thread( Runnable* task );     
-       virtual ~Thread();
-   
-       /** 
-        * Creates a system thread and starts it in a joinable mode.  
-        * Upon creation, the
-        * run() method of either this object or the provided Runnable
-        * object will be invoked in the context of this thread.
-        * @exception runtime_error is thrown if the system could
-        * not start the thread.
-        */
-       virtual void start() throw (exceptions::ActiveMQException);
-   
-       /**
-        * Wait til the thread exits. This is when the run()
-        * method has returned or has thrown an exception.
-        * If an exception was thrown in the run() method,
-        * join() will return the thrown exception. Otherwise
-        * (if run() returned normally), join() will
-        * return NULL.
-        */
-       virtual void join() throw (exceptions::ActiveMQException);
-       
-       /**
-        * Default implementation of the run method - does nothing.
-        */
-       virtual void run(){};
-       
-   public:
-   
-      /**
-       * Halts execution of the calling thread for a specified no of millisec.
-       *   
-       * Note that this method is a static method that applies to the
-       * calling thread and not to the thread object.
-       */
-      static void sleep(int millisecs);
-       
-      /**
-       * Obtains the Thread Id of the current thread
-       * @return Thread Id
-       */
-      static unsigned long getId(void); 
-   
-   private:
-   
-       // Internal thread handling
-   #ifdef unix
-       static void* runCallback (void* param);
-   #else
-       static unsigned int WINAPI runCallback (void* param);
-   #endif
-   } ;
-
-}}
-
-#endif /*ACTIVEMQ_CONCURRENT_THREAD_H*/
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ACTIVEMQ_CONCURRENT_THREAD_H
+#define ACTIVEMQ_CONCURRENT_THREAD_H
+
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/concurrent/Runnable.h>
+#include <stdexcept>
+#include <assert.h>
+
+#if (defined(__unix__) || defined(unix) || defined(MACOSX) || defined(__APPLE__)) && !defined(USG)
+   
+   #ifndef unix
+      #define unix
+   #endif
+
+   #include <pthread.h>
+#else
+   #include <windows.h>
+#endif
+
+namespace activemq{
+namespace concurrent{
+   
+   /** 
+    * Basic thread class - mimics the Java Thread.  Derived classes may 
+    * implement the run method, or this class can be used as is with 
+    * a provided Runnable delegate.
+    */
+   class Thread : public Runnable
+   {
+   private:
+   
+      /**
+       * The task to be run by this thread, defaults to 
+       * this thread object.
+       */
+      Runnable* task;
+      
+   #ifdef unix
+      pthread_attr_t attributes;
+       pthread_t   threadHandle ;
+   #else
+       HANDLE      threadHandle ;
+   #endif
+   
+       /**
+        * Started state of this thread.
+        */
+       bool started;
+       
+       /**
+        * Indicates whether the thread has already been
+        * joined.
+        */
+       bool joined;
+       
+   public:
+      
+       Thread();
+       Thread( Runnable* task );     
+       virtual ~Thread();
+   
+       /** 
+        * Creates a system thread and starts it in a joinable mode.  
+        * Upon creation, the
+        * run() method of either this object or the provided Runnable
+        * object will be invoked in the context of this thread.
+        * @exception runtime_error is thrown if the system could
+        * not start the thread.
+        */
+       virtual void start() throw (exceptions::ActiveMQException);
+   
+       /**
+        * Wait til the thread exits. This is when the run()
+        * method has returned or has thrown an exception.
+        * If an exception was thrown in the run() method,
+        * join() will return the thrown exception. Otherwise
+        * (if run() returned normally), join() will
+        * return NULL.
+        */
+       virtual void join() throw (exceptions::ActiveMQException);
+       
+       /**
+        * Default implementation of the run method - does nothing.
+        */
+       virtual void run(){};
+       
+   public:
+   
+      /**
+       * Halts execution of the calling thread for a specified no of millisec.
+       *   
+       * Note that this method is a static method that applies to the
+       * calling thread and not to the thread object.
+       */
+      static void sleep(int millisecs);
+       
+      /**
+       * Obtains the Thread Id of the current thread
+       * @return Thread Id
+       */
+      static unsigned long getId(void); 
+   
+   private:
+   
+       // Internal thread handling
+   #ifdef unix
+       static void* runCallback (void* param);
+   #else
+       static unsigned int WINAPI runCallback (void* param);
+   #endif
+   } ;
+
+}}
+
+#endif /*ACTIVEMQ_CONCURRENT_THREAD_H*/

Propchange: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/Thread.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.cpp?rev=419365&r1=419364&r2=419365&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.cpp (original)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.cpp Wed Jul  5 15:27:34 2006
@@ -1,344 +1,344 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <activemq/concurrent/ThreadPool.h>
-#include <activemq/concurrent/Concurrent.h>
-#include <activemq/exceptions/IllegalArgumentException.h>
-
-#ifdef min
-#undef min
-#endif
-
-#include <algorithm>
-#include <iostream>
-
-using namespace std;
-using namespace activemq;
-using namespace activemq::concurrent;
-
-////////////////////////////////////////////////////////////////////////////////
-LOGCMS_INITIALIZE(logger, ThreadPool, "com.activemq.concurrent.ThreadPool");
-LOGCMS_INITIALIZE(marker, ThreadPool, "com.activemq.concurrent.ThreadPool.Marker");
-
-////////////////////////////////////////////////////////////////////////////////
-ThreadPool ThreadPool::instance;
-
-////////////////////////////////////////////////////////////////////////////////
-ThreadPool::ThreadPool(void)
-{
-   maxThreads  = DEFAULT_MAX_POOL_SIZE;
-   blockSize   = DEFAULT_MAX_BLOCK_SIZE;
-   freeThreads = 0;
-
-   shutdown = false;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-ThreadPool::~ThreadPool(void)
-{
-   try
-   {
-      std::vector<PooledThread*>::iterator itr = pool.begin();
-       
-      // Stop all the threads
-      for(; itr != pool.end(); ++itr)
-      {
-         (*itr)->stop();
-      }
-       
-      // Set the shutdown flag so that the DeQueue methods all quit
-      // when we interrupt them.
-      shutdown = true;
-       
-      synchronized(&queue)
-      {
-         // Signal the Queue so that all waiters are notified
-         queue.notifyAll();
-      }
-       
-      // Wait for everyone to die
-      for(itr = pool.begin(); itr != pool.end(); ++itr)
-      {
-         (*itr)->join();
-      }      
-   }
-   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
-   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ThreadPool::queueTask(ThreadPool::Task task) 
-   throw ( exceptions::ActiveMQException )
-{
-   try
-   {
-      if(!task.first || !task.second)
-      {
-         throw exceptions::IllegalArgumentException( __FILE__, __LINE__,
-            "ThreadPool::QueueTask - Invalid args for Task");
-      }
-       
-      //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - syncing on queue");
-    
-      synchronized(&queue)
-      {
-         //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - sync'd, synching pool");
-    
-         // If there's nobody open to do work, then create some more
-         // threads to handle the work.
-         if(freeThreads == 0)
-         {
-            AllocateThreads(blockSize);
-         }
-    
-         //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - pushing task");
-    
-         // queue the new work.
-         queue.push(task);
-         
-         //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - calling notify");
-   
-         // Inform waiters that we put some work on the queue.
-         queue.notify();
-      }
-   }
-   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
-   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-ThreadPool::Task ThreadPool::deQueueTask(void)
-   throw ( exceptions::ActiveMQException )
-{
-   try
-   {
-      //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - syncing on queue");
-    
-      synchronized(&queue)
-      {
-         /*LOGCMS_DEBUG(logger, 
-            "ThreadPool::DeQueueTask - sync'd checking queue empty");*/
-
-         // Wait for work, wait in a while loop since another thread could
-         // be waiting for a lock and get the work before we get woken up
-         // from our wait.
-         while(queue.empty() && !shutdown)
-         {
-            //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - Q empty, waiting");
-
-            queue.wait();
-
-            //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - done waiting");
-         }
-          
-         // Don't give more work if we are closing down
-         if(shutdown)
-         {
-            return Task();
-         }
-          
-         // check size again.
-         if(queue.empty())
-         {
-            throw exceptions::ActiveMQException( __FILE__, __LINE__,
-               "ThreadPool::DeQueueUserWorkItem - Empty Taskn, not in shutdown.");
-         }
-          
-         //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - popping task");
-
-         // not empty so get the new work to do
-         return queue.pop();
-      }
-       
-      return Task();
-   }
-   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
-   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ThreadPool::reserve(unsigned long size)
-{
-   try{
-      synchronized(&poolLock)
-      {
-         if(size < pool.size() || pool.size() == maxThreads)
-         {
-            return;
-         }
-         
-         // How many do we reserve
-         unsigned long allocCount = size - pool.size();
-          
-         // Allocate the new Threads
-         AllocateThreads(allocCount);
-      }
-   }
-   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
-   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ThreadPool::setMaxThreads(unsigned long maxThreads)
-{
-   try
-   {
-      synchronized(&poolLock)
-      {
-         if(maxThreads == 0)
-         {
-            // Caller tried to do something stupid, ignore them.
-            return;
-         }
-          
-         this->maxThreads = maxThreads;
-      }
-   }
-   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
-   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ThreadPool::setBlockSize(unsigned long blockSize)
-{
-   try
-   {
-      if(blockSize <= 0)
-      {
-         // User tried something dumb, protect them from themselves
-         return;
-      }
-    
-      synchronized(&poolLock)
-      {
-         this->blockSize = blockSize;
-      }
-   }
-   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
-   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ThreadPool::AllocateThreads(unsigned long count)
-{
-   try
-   {
-      if(pool.size() >= maxThreads)
-      {
-         return;
-      }
-    
-      synchronized(&poolLock)
-      {
-         // Take the min of alloc size of maxThreads since we don't
-         // want anybody sneaking eaxtra threads in, greedy bastards.
-         count = std::min(count, maxThreads - pool.size());
-       
-         // Each time we create a thread we increment the free Threads 
-         // counter, but before we call start so that the Thread doesn't 
-         // get ahead of us.
-         for(unsigned long i = 0; i < count; ++i)
-         {
-            pool.push_back(new PooledThread(this));
-            pool.back()->setPooledThreadListener(this);
-            freeThreads++;
-            pool.back()->start();
-         }
-      }
-   }
-   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
-   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ThreadPool::onTaskStarted(PooledThread* thread)
-{
-   try
-   {
-      synchronized(&poolLock)
-      {
-         freeThreads--;
-          
-         // Now that this callback has decremented the free threads coutner
-         // let check if there is any outstanding work to be done and no
-         // threads to handle it.  This could happen if the QueueTask
-         // method was called successively without any of the PooledThreads
-         // having a chance to wake up and service the queue.  This would
-         // cause the number of Task to exceed the number of free threads
-         // once the Threads got a chance to wake up and service the queue
-         if(freeThreads == 0 && !queue.empty())
-         {
-            // Allocate a new block of threads
-            AllocateThreads(blockSize);
-         }
-      }
-
-      //LOGCMS_DEBUG(logger, "ThreadPool::onTaskStarted:");
-   }
-   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
-   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
-}
- 
-////////////////////////////////////////////////////////////////////////////////
-void ThreadPool::onTaskCompleted(PooledThread* thread)
-{
-   try
-   {    
-      synchronized(&poolLock)
-      {
-         freeThreads++;
-      }
-
-      //LOGCMS_DEBUG(logger, "ThreadPool::onTaskCompleted: ");
-   }
-   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
-   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ThreadPool::onTaskException(
-   PooledThread* thread, 
-   exceptions::ActiveMQException& ex)
-{
-   //LOGCMS_DEBUG(logger, "ThreadPool::onTaskException: ");
-
-   try
-   {
-      synchronized(&poolLock)
-      {
-         // Delete the thread that had the exception and start a new 
-         // one to take its place.
-         freeThreads--;
-          
-         std::vector<PooledThread*>::iterator itr = 
-            std::find(pool.begin(), pool.end(), thread);
-    
-         if(itr != pool.end())
-         {
-            pool.erase(itr);
-         }
-    
-         // Bye-Bye Thread Object
-         delete thread;
-          
-         // Now allocate a replacement
-         AllocateThreads(1);
-      }
-   }
-   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
-   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
-}
-                                
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <activemq/concurrent/ThreadPool.h>
+#include <activemq/concurrent/Concurrent.h>
+#include <activemq/exceptions/IllegalArgumentException.h>
+
+#ifdef min
+#undef min
+#endif
+
+#include <algorithm>
+#include <iostream>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+LOGCMS_INITIALIZE(logger, ThreadPool, "com.activemq.concurrent.ThreadPool");
+LOGCMS_INITIALIZE(marker, ThreadPool, "com.activemq.concurrent.ThreadPool.Marker");
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPool ThreadPool::instance;
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPool::ThreadPool(void)
+{
+   maxThreads  = DEFAULT_MAX_POOL_SIZE;
+   blockSize   = DEFAULT_MAX_BLOCK_SIZE;
+   freeThreads = 0;
+
+   shutdown = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPool::~ThreadPool(void)
+{
+   try
+   {
+      std::vector<PooledThread*>::iterator itr = pool.begin();
+       
+      // Stop all the threads
+      for(; itr != pool.end(); ++itr)
+      {
+         (*itr)->stop();
+      }
+       
+      // Set the shutdown flag so that the DeQueue methods all quit
+      // when we interrupt them.
+      shutdown = true;
+       
+      synchronized(&queue)
+      {
+         // Signal the Queue so that all waiters are notified
+         queue.notifyAll();
+      }
+       
+      // Wait for everyone to die
+      for(itr = pool.begin(); itr != pool.end(); ++itr)
+      {
+         (*itr)->join();
+      }      
+   }
+   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::queueTask(ThreadPool::Task task) 
+   throw ( exceptions::ActiveMQException )
+{
+   try
+   {
+      if(!task.first || !task.second)
+      {
+         throw exceptions::IllegalArgumentException( __FILE__, __LINE__,
+            "ThreadPool::QueueTask - Invalid args for Task");
+      }
+       
+      //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - syncing on queue");
+    
+      synchronized(&queue)
+      {
+         //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - sync'd, synching pool");
+    
+         // If there's nobody open to do work, then create some more
+         // threads to handle the work.
+         if(freeThreads == 0)
+         {
+            AllocateThreads(blockSize);
+         }
+    
+         //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - pushing task");
+    
+         // queue the new work.
+         queue.push(task);
+         
+         //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - calling notify");
+   
+         // Inform waiters that we put some work on the queue.
+         queue.notify();
+      }
+   }
+   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPool::Task ThreadPool::deQueueTask(void)
+   throw ( exceptions::ActiveMQException )
+{
+   try
+   {
+      //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - syncing on queue");
+    
+      synchronized(&queue)
+      {
+         /*LOGCMS_DEBUG(logger, 
+            "ThreadPool::DeQueueTask - sync'd checking queue empty");*/
+
+         // Wait for work, wait in a while loop since another thread could
+         // be waiting for a lock and get the work before we get woken up
+         // from our wait.
+         while(queue.empty() && !shutdown)
+         {
+            //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - Q empty, waiting");
+
+            queue.wait();
+
+            //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - done waiting");
+         }
+          
+         // Don't give more work if we are closing down
+         if(shutdown)
+         {
+            return Task();
+         }
+          
+         // check size again.
+         if(queue.empty())
+         {
+            throw exceptions::ActiveMQException( __FILE__, __LINE__,
+               "ThreadPool::DeQueueUserWorkItem - Empty Taskn, not in shutdown.");
+         }
+          
+         //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - popping task");
+
+         // not empty so get the new work to do
+         return queue.pop();
+      }
+       
+      return Task();
+   }
+   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::reserve(unsigned long size)
+{
+   try{
+      synchronized(&poolLock)
+      {
+         if(size < pool.size() || pool.size() == maxThreads)
+         {
+            return;
+         }
+         
+         // How many do we reserve
+         unsigned long allocCount = size - pool.size();
+          
+         // Allocate the new Threads
+         AllocateThreads(allocCount);
+      }
+   }
+   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::setMaxThreads(unsigned long maxThreads)
+{
+   try
+   {
+      synchronized(&poolLock)
+      {
+         if(maxThreads == 0)
+         {
+            // Caller tried to do something stupid, ignore them.
+            return;
+         }
+          
+         this->maxThreads = maxThreads;
+      }
+   }
+   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::setBlockSize(unsigned long blockSize)
+{
+   try
+   {
+      if(blockSize <= 0)
+      {
+         // User tried something dumb, protect them from themselves
+         return;
+      }
+    
+      synchronized(&poolLock)
+      {
+         this->blockSize = blockSize;
+      }
+   }
+   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::AllocateThreads(unsigned long count)
+{
+   try
+   {
+      if(pool.size() >= maxThreads)
+      {
+         return;
+      }
+    
+      synchronized(&poolLock)
+      {
+         // Take the min of alloc size of maxThreads since we don't
+         // want anybody sneaking eaxtra threads in, greedy bastards.
+         count = std::min(count, maxThreads - pool.size());
+       
+         // Each time we create a thread we increment the free Threads 
+         // counter, but before we call start so that the Thread doesn't 
+         // get ahead of us.
+         for(unsigned long i = 0; i < count; ++i)
+         {
+            pool.push_back(new PooledThread(this));
+            pool.back()->setPooledThreadListener(this);
+            freeThreads++;
+            pool.back()->start();
+         }
+      }
+   }
+   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::onTaskStarted(PooledThread* thread)
+{
+   try
+   {
+      synchronized(&poolLock)
+      {
+         freeThreads--;
+          
+         // Now that this callback has decremented the free threads coutner
+         // let check if there is any outstanding work to be done and no
+         // threads to handle it.  This could happen if the QueueTask
+         // method was called successively without any of the PooledThreads
+         // having a chance to wake up and service the queue.  This would
+         // cause the number of Task to exceed the number of free threads
+         // once the Threads got a chance to wake up and service the queue
+         if(freeThreads == 0 && !queue.empty())
+         {
+            // Allocate a new block of threads
+            AllocateThreads(blockSize);
+         }
+      }
+
+      //LOGCMS_DEBUG(logger, "ThreadPool::onTaskStarted:");
+   }
+   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+ 
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::onTaskCompleted(PooledThread* thread)
+{
+   try
+   {    
+      synchronized(&poolLock)
+      {
+         freeThreads++;
+      }
+
+      //LOGCMS_DEBUG(logger, "ThreadPool::onTaskCompleted: ");
+   }
+   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ThreadPool::onTaskException(
+   PooledThread* thread, 
+   exceptions::ActiveMQException& ex)
+{
+   //LOGCMS_DEBUG(logger, "ThreadPool::onTaskException: ");
+
+   try
+   {
+      synchronized(&poolLock)
+      {
+         // Delete the thread that had the exception and start a new 
+         // one to take its place.
+         freeThreads--;
+          
+         std::vector<PooledThread*>::iterator itr = 
+            std::find(pool.begin(), pool.end(), thread);
+    
+         if(itr != pool.end())
+         {
+            pool.erase(itr);
+         }
+    
+         // Bye-Bye Thread Object
+         delete thread;
+          
+         // Now allocate a replacement
+         AllocateThreads(1);
+      }
+   }
+   AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+   AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+                                

Propchange: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.h?rev=419365&r1=419364&r2=419365&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.h (original)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.h Wed Jul  5 15:27:34 2006
@@ -1,239 +1,239 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef _ACTIVEMQ_CONCURRENT_THREADPOOL_H_
-#define _ACTIVEMQ_CONCURRENT_THREADPOOL_H_
-
-#include <activemq/concurrent/Runnable.h>
-#include <activemq/concurrent/PooledThread.h>
-#include <activemq/concurrent/PooledThreadListener.h>
-#include <activemq/concurrent/TaskListener.h>
-#include <activemq/concurrent/Mutex.h>
-#include <activemq/util/Queue.h>
-#include <activemq/logger/LoggerDefines.h>
-
-#include <vector>
-
-namespace activemq{
-namespace concurrent{
-
-   /**
-    * Defines a Thread Pool object that implements the functionality
-    * of pooling threads to perform user tasks.  The Thread Poll has
-    * max size that it will grow to.  The thread pool allocates threads
-    * in blocks.  When there are no waiting worker threads and a task
-    * is queued then a new batch is allocated.  The user can specify
-    * the size of the blocks, otherwise a default value is used.
-    * <P>
-    * When the user queues a task they must also queue a listner to 
-    * be notified when the task has completed, this provides the user
-    * with a mechanism to know when a task object can be freed.
-    * <P>
-    * To have the Thread Pool perform a task, the user enqueue's an
-    * object that implements the <code>Runnable</code> insterface and
-    * one of the worker threads will executing it in its thread context.
-    */
-   class ThreadPool : public PooledThreadListener
-   {
-   public:
-   
-      // Constants
-      static const size_t DEFAULT_MAX_POOL_SIZE  = 10;
-      static const size_t DEFAULT_MAX_BLOCK_SIZE = 3;
-         
-      // Types
-      typedef std::pair<Runnable*, TaskListener*> Task;
-
-   private:
-   
-      // Vector of threads that this object has created for its pool.
-      std::vector< PooledThread* > pool;
-      
-      // Queue of Task that are in need of completion
-      util::Queue<Task> queue;
-      
-      // Max number of Threads this Pool can contian      
-      unsigned long maxThreads;
-      
-      // Max number of tasks that can be allocated at a time
-      unsigned long blockSize;
-      
-      // boolean flag use to indocate that this object is shutting down.
-      bool shutdown;
-      
-      // Count of threads that are currently free to perfom some work.
-      unsigned long freeThreads;
-      
-      // Mutex for locking operations that affect the pool.
-      Mutex poolLock;
-
-      // Logger Init 
-      LOGCMS_DECLARE(logger);
-      LOGCMS_DECLARE(marker);
-      
-   private:   // Statics
-   
-      // The singleton instance of this class
-      static ThreadPool instance;
-            
-   public:
-         
-      /**
-       * Constructor
-       */
-      ThreadPool(void);
-
-      /**
-       * Destructor
-       */
-   	virtual ~ThreadPool(void);
-
-      /**
-       * Queue a task to be completed by one of the Pooled Threads.
-       * tasks are serviced as soon as a <code>PooledThread</code>
-       * is available to run it.
-       * @param object that derives from Runnable
-       * @throws ActiveMQException
-       */
-      virtual void queueTask(Task task) 
-         throw ( exceptions::ActiveMQException );
-
-      /**
-       * DeQueue a task to be completed by one of the Pooled Threads.
-       * A caller of this method will block until there is something
-       * in the tasks queue, therefore care must be taken when calling
-       * this function.  Normally clients of ThreadPool don't use
-       * this, only the <code>PooledThread</code> objects owned by
-       * this ThreadPool.
-       * @return object that derives from Runnable
-       * @throws ActiveMQException
-       */
-      virtual Task deQueueTask(void)
-         throw ( exceptions::ActiveMQException );
-
-      /**
-       * Returns the current number of Threads in the Pool, this is
-       * how many there are now, not how many are active or the max 
-       * number that might exist.
-       * @return integer number of threads in existance.
-       */
-      virtual unsigned long getPoolSize(void) const { return pool.size(); }
-      
-      /**
-       * Returns the current backlog of items in the tasks queue, this
-       * is how much work is still waiting to get done.  
-       * @return number of outstanding tasks.
-       */
-      virtual unsigned long getBacklog(void) const { return queue.size(); }
-      
-      /**
-       * Ensures that there is at least the specified number of Threads
-       * allocated to the pool.  If the size is greater than the MAX
-       * number of threads in the pool, then only MAX threads are 
-       * reservved.  If the size is smaller than the number of threads
-       * currently in the pool, than nothing is done.
-       * @param number of threads to reserve.
-       */
-      virtual void reserve(unsigned long size);
-      
-      /**
-       * Get the Max Number of Threads this Pool can contain
-       * @return max size
-       */
-      virtual unsigned long getMaxThreads(void) const { return maxThreads; }
-      
-      /**
-       * Sets the Max number of threads this pool can contian. 
-       * if this value is smaller than the current size of the
-       * pool nothing is done.
-       */
-      virtual void setMaxThreads(unsigned long maxThreads);
-      
-      /**
-       * Gets the Max number of threads that can be allocated at a time
-       * when new threads are needed.
-       * @return max Thread Block Size
-       */
-      virtual unsigned long getBlockSize(void) const { return blockSize; }
-      
-      /**
-       * Sets the Max number of Threads that can be allocated at a time
-       * when the Thread Pool determines that more Threads are needed.  
-       * @param Max Thread Block Size
-       */
-      virtual void setBlockSize(unsigned long blockSize);
-      
-      /**
-       * Returns the current number of available threads in the pool, threads
-       * that are performing a user task are considered unavailable.  This value
-       * could change immeadiately after calling as Threads could finish right
-       * after and be available again.  This is informational only.
-       * @return totoal free threads
-       */
-      virtual unsigned long getFreeThreadCount(void) const { return freeThreads; }
-
-   public: // PooledThreadListener Callbacks
-      
-      /**
-       * Called by a pooled thread when it is about to begin
-       * executing a new task.  This will decrement the available
-       * threads counter so that this object knows when there are
-       * no more free threads and must create new ones.
-       * @param Pointer to the Pooled Thread that is making this call
-       */
-      virtual void onTaskStarted(PooledThread* thread);
-       
-      /**
-       * Called by a pooled thread when it has completed a task
-       * and is going back to waiting for another task to run,
-       * this will increment the free threads counter.
-       * @param Pointer the the Pooled Thread that is making this call.
-       */
-      virtual void onTaskCompleted(PooledThread* thread);
-
-      /**
-       * Called by a pooled thread when it has encountered an exception
-       * while running a user task, after receiving this notification
-       * the callee should assume that the PooledThread is now no longer
-       * running.
-       * @param Pointer to the Pooled Thread that is making this call
-       * @param The Exception that occured.
-       */
-      virtual void onTaskException(PooledThread* thread, 
-                                   exceptions::ActiveMQException& ex);
-
-   public:   // Statics
-
-      /**
-       * Return the one and only Thread Pool instance.
-       * @return The Thread Pool Pointer
-       */
-      static ThreadPool* getInstance(void) { return &instance; }
-
-   private:
-   
-      /**
-       * Allocates the requested ammount of Threads, won't exceed
-       * <code>maxThreads</code>.
-       * @param the number of threads to create
-       */
-      void AllocateThreads(unsigned long count); 
-
-   };
-
-}}
-
-#endif /*_ACTIVEMQ_CONCURRENT_THREADPOOL_H_*/
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONCURRENT_THREADPOOL_H_
+#define _ACTIVEMQ_CONCURRENT_THREADPOOL_H_
+
+#include <activemq/concurrent/Runnable.h>
+#include <activemq/concurrent/PooledThread.h>
+#include <activemq/concurrent/PooledThreadListener.h>
+#include <activemq/concurrent/TaskListener.h>
+#include <activemq/concurrent/Mutex.h>
+#include <activemq/util/Queue.h>
+#include <activemq/logger/LoggerDefines.h>
+
+#include <vector>
+
+namespace activemq{
+namespace concurrent{
+
+   /**
+    * Defines a Thread Pool object that implements the functionality
+    * of pooling threads to perform user tasks.  The Thread Poll has
+    * max size that it will grow to.  The thread pool allocates threads
+    * in blocks.  When there are no waiting worker threads and a task
+    * is queued then a new batch is allocated.  The user can specify
+    * the size of the blocks, otherwise a default value is used.
+    * <P>
+    * When the user queues a task they must also queue a listner to 
+    * be notified when the task has completed, this provides the user
+    * with a mechanism to know when a task object can be freed.
+    * <P>
+    * To have the Thread Pool perform a task, the user enqueue's an
+    * object that implements the <code>Runnable</code> insterface and
+    * one of the worker threads will executing it in its thread context.
+    */
+   class ThreadPool : public PooledThreadListener
+   {
+   public:
+   
+      // Constants
+      static const size_t DEFAULT_MAX_POOL_SIZE  = 10;
+      static const size_t DEFAULT_MAX_BLOCK_SIZE = 3;
+         
+      // Types
+      typedef std::pair<Runnable*, TaskListener*> Task;
+
+   private:
+   
+      // Vector of threads that this object has created for its pool.
+      std::vector< PooledThread* > pool;
+      
+      // Queue of Task that are in need of completion
+      util::Queue<Task> queue;
+      
+      // Max number of Threads this Pool can contian      
+      unsigned long maxThreads;
+      
+      // Max number of tasks that can be allocated at a time
+      unsigned long blockSize;
+      
+      // boolean flag use to indocate that this object is shutting down.
+      bool shutdown;
+      
+      // Count of threads that are currently free to perfom some work.
+      unsigned long freeThreads;
+      
+      // Mutex for locking operations that affect the pool.
+      Mutex poolLock;
+
+      // Logger Init 
+      LOGCMS_DECLARE(logger);
+      LOGCMS_DECLARE(marker);
+      
+   private:   // Statics
+   
+      // The singleton instance of this class
+      static ThreadPool instance;
+            
+   public:
+         
+      /**
+       * Constructor
+       */
+      ThreadPool(void);
+
+      /**
+       * Destructor
+       */
+   	virtual ~ThreadPool(void);
+
+      /**
+       * Queue a task to be completed by one of the Pooled Threads.
+       * tasks are serviced as soon as a <code>PooledThread</code>
+       * is available to run it.
+       * @param object that derives from Runnable
+       * @throws ActiveMQException
+       */
+      virtual void queueTask(Task task) 
+         throw ( exceptions::ActiveMQException );
+
+      /**
+       * DeQueue a task to be completed by one of the Pooled Threads.
+       * A caller of this method will block until there is something
+       * in the tasks queue, therefore care must be taken when calling
+       * this function.  Normally clients of ThreadPool don't use
+       * this, only the <code>PooledThread</code> objects owned by
+       * this ThreadPool.
+       * @return object that derives from Runnable
+       * @throws ActiveMQException
+       */
+      virtual Task deQueueTask(void)
+         throw ( exceptions::ActiveMQException );
+
+      /**
+       * Returns the current number of Threads in the Pool, this is
+       * how many there are now, not how many are active or the max 
+       * number that might exist.
+       * @return integer number of threads in existance.
+       */
+      virtual unsigned long getPoolSize(void) const { return pool.size(); }
+      
+      /**
+       * Returns the current backlog of items in the tasks queue, this
+       * is how much work is still waiting to get done.  
+       * @return number of outstanding tasks.
+       */
+      virtual unsigned long getBacklog(void) const { return queue.size(); }
+      
+      /**
+       * Ensures that there is at least the specified number of Threads
+       * allocated to the pool.  If the size is greater than the MAX
+       * number of threads in the pool, then only MAX threads are 
+       * reservved.  If the size is smaller than the number of threads
+       * currently in the pool, than nothing is done.
+       * @param number of threads to reserve.
+       */
+      virtual void reserve(unsigned long size);
+      
+      /**
+       * Get the Max Number of Threads this Pool can contain
+       * @return max size
+       */
+      virtual unsigned long getMaxThreads(void) const { return maxThreads; }
+      
+      /**
+       * Sets the Max number of threads this pool can contian. 
+       * if this value is smaller than the current size of the
+       * pool nothing is done.
+       */
+      virtual void setMaxThreads(unsigned long maxThreads);
+      
+      /**
+       * Gets the Max number of threads that can be allocated at a time
+       * when new threads are needed.
+       * @return max Thread Block Size
+       */
+      virtual unsigned long getBlockSize(void) const { return blockSize; }
+      
+      /**
+       * Sets the Max number of Threads that can be allocated at a time
+       * when the Thread Pool determines that more Threads are needed.  
+       * @param Max Thread Block Size
+       */
+      virtual void setBlockSize(unsigned long blockSize);
+      
+      /**
+       * Returns the current number of available threads in the pool, threads
+       * that are performing a user task are considered unavailable.  This value
+       * could change immeadiately after calling as Threads could finish right
+       * after and be available again.  This is informational only.
+       * @return totoal free threads
+       */
+      virtual unsigned long getFreeThreadCount(void) const { return freeThreads; }
+
+   public: // PooledThreadListener Callbacks
+      
+      /**
+       * Called by a pooled thread when it is about to begin
+       * executing a new task.  This will decrement the available
+       * threads counter so that this object knows when there are
+       * no more free threads and must create new ones.
+       * @param Pointer to the Pooled Thread that is making this call
+       */
+      virtual void onTaskStarted(PooledThread* thread);
+       
+      /**
+       * Called by a pooled thread when it has completed a task
+       * and is going back to waiting for another task to run,
+       * this will increment the free threads counter.
+       * @param Pointer the the Pooled Thread that is making this call.
+       */
+      virtual void onTaskCompleted(PooledThread* thread);
+
+      /**
+       * Called by a pooled thread when it has encountered an exception
+       * while running a user task, after receiving this notification
+       * the callee should assume that the PooledThread is now no longer
+       * running.
+       * @param Pointer to the Pooled Thread that is making this call
+       * @param The Exception that occured.
+       */
+      virtual void onTaskException(PooledThread* thread, 
+                                   exceptions::ActiveMQException& ex);
+
+   public:   // Statics
+
+      /**
+       * Return the one and only Thread Pool instance.
+       * @return The Thread Pool Pointer
+       */
+      static ThreadPool* getInstance(void) { return &instance; }
+
+   private:
+   
+      /**
+       * Allocates the requested ammount of Threads, won't exceed
+       * <code>maxThreads</code>.
+       * @param the number of threads to create
+       */
+      void AllocateThreads(unsigned long count); 
+
+   };
+
+}}
+
+#endif /*_ACTIVEMQ_CONCURRENT_THREADPOOL_H_*/

Propchange: incubator/activemq/trunk/activemq-cpp/src/main/activemq/concurrent/ThreadPool.h
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message