activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1471139 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/Connection.cs main/csharp/Threads/ThreadPoolExecutor.cs test/csharp/Threads/ThreadPoolExecutorTest.cs
Date Tue, 23 Apr 2013 20:53:01 GMT
Author: tabish
Date: Tue Apr 23 20:53:01 2013
New Revision: 1471139

URL: http://svn.apache.org/r1471139
Log:
ThreadPoolExecutor should be allowed to complete pending tasks after Shutdown is called, but
reject any new tasks.  Add an AwaitTermination method to allow for orderly shutdown before
abandoning outstanding work.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/ThreadPoolExecutorTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=1471139&r1=1471138&r2=1471139&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Tue Apr
23 20:53:01 2013
@@ -684,6 +684,10 @@ namespace Apache.NMS.ActiveMQ
 					}
 
 					executor.Shutdown();
+					if (!executor.AwaitTermination(TimeSpan.FromMinutes(1)))
+					{
+						Tracer.DebugFormat("Connection[{0}]: Failed to properly shutdown its executor", this.ConnectionId);
+					}
 
 					Tracer.DebugFormat("Connection[{0}]: Disposing of the Transport.", this.ConnectionId);
 					transport.Stop();

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs?rev=1471139&r1=1471138&r2=1471139&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs
Tue Apr 23 20:53:01 2013
@@ -100,8 +100,23 @@ namespace Apache.NMS.ActiveMQ.Threads
             }
         }
 
+		/// <summary>
+		/// Returns true if this ThreadPoolExecutor has been shut down but has not 
+		/// finished running all the tasks that have been Queue.  When a ThreadPoolExecutor
+		/// is shut down it will not accept any new tasks but it will complete all tasks
+		/// that have been previously queued.
+		/// </summary>
         public bool IsShutdown
         {
+            get { return this.closing; }
+        }
+
+		/// <summary>
+		/// Returns true if this ThreadPoolExecutor has been shut down and has also
+		/// completed processing of all outstanding tasks in its task Queue.
+		/// </summary>
+        public bool IsTerminated
+        {
             get { return this.closed; }
         }
 
@@ -109,26 +124,48 @@ namespace Apache.NMS.ActiveMQ.Threads
         {
             if(!this.closed)
             {
+				lock(this.syncRoot)
+				{
+	                if(!this.closed)
+	                {
+	                    this.closing = true;
+
+						// Must be no tasks in Queue and none can be accepted
+						// now that we've flipped the closing toggle so safe to
+						// mark this ThreadPoolExecutor as closed.
+						if (!this.running)
+						{
+							this.closed = true;
+							this.executionComplete.Set();
+						}
+	                }
+				}
+            }
+        }
+
+		public bool AwaitTermination(TimeSpan timeout) 
+		{
+            if(!this.closed)
+            {
                 syncRoot.WaitOne();
 
                 if(!this.closed)
                 {
-                    this.closing = true;
-                    this.workQueue.Clear();
-
+					// If called from the worker thread we can't check this as it 
+					// will deadlock us, just return whatever the closed state is.
                     if(this.running && Thread.CurrentThread != this.workThread)
                     {
                         syncRoot.ReleaseMutex();
-                        this.executionComplete.WaitOne();
+                        this.closed = this.executionComplete.WaitOne(timeout, false);
                         syncRoot.WaitOne();
                     }
-
-                    this.closed = true;
                 }
 
                 syncRoot.ReleaseMutex();
             }
-        }
+
+			return this.closed;
+		}
 
         private void QueueProcessor(object unused)
         {
@@ -138,7 +175,7 @@ namespace Apache.NMS.ActiveMQ.Threads
             {
                 this.workThread = Thread.CurrentThread;
 
-                if(this.workQueue.Count == 0 || this.closing)
+                if(this.workQueue.Count == 0)
                 {
                     this.running = false;
                     this.executionComplete.Set();
@@ -156,10 +193,13 @@ namespace Apache.NMS.ActiveMQ.Threads
             {
                 this.workThread = null;
 
-                if(this.closing)
+                if(this.workQueue.Count == 0)
                 {
-                    this.running = false;
-                    this.executionComplete.Set();
+            		lock(syncRoot)
+					{
+                    	this.running = false;
+                    	this.executionComplete.Set();
+					}
                 }
                 else
                 {

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/ThreadPoolExecutorTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/ThreadPoolExecutorTest.cs?rev=1471139&r1=1471138&r2=1471139&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/ThreadPoolExecutorTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/ThreadPoolExecutorTest.cs
Tue Apr 23 20:53:01 2013
@@ -87,6 +87,22 @@ namespace Apache.NMS.ActiveMQ.Test
             complete.Set();
         }
 
+	    /// <summary>
+	    /// Wait out termination of a thread pool or fail doing so
+	    /// </summary>
+	    public void JoinPool(ThreadPoolExecutor exec) 
+		{
+	        try 
+			{
+	            exec.Shutdown();
+	            Assert.IsTrue(exec.AwaitTermination(TimeSpan.FromSeconds(20)));
+	        } 
+			catch(Exception) 
+			{
+	            Assert.Fail("Unexpected exception");
+	        }
+	    }
+
         [SetUp]
         public void SetUp()
         {
@@ -119,7 +135,7 @@ namespace Apache.NMS.ActiveMQ.Test
             Assert.IsTrue(this.waitingTaskCompleted);
 
             executor.Shutdown();
-            Assert.IsTrue(executor.IsShutdown);
+			JoinPool(executor);
         }
 
         [Test]
@@ -153,6 +169,7 @@ namespace Apache.NMS.ActiveMQ.Test
             Assert.IsTrue(this.doneLatch.await(TimeSpan.FromMilliseconds(30 * 1000)));
 
             executor.Shutdown();
+			JoinPool(executor);
             Assert.IsTrue(executor.IsShutdown);
         }
 
@@ -173,11 +190,12 @@ namespace Apache.NMS.ActiveMQ.Test
             Assert.IsTrue(this.doneLatch.await(TimeSpan.FromMilliseconds(30 * 1000)));
 
             executor.Shutdown();
+			JoinPool(executor);
             Assert.IsTrue(executor.IsShutdown);
         }
 
         [Test]
-        public void TestThatShutdownPurgesTasks()
+        public void TestThatShutdownDoesntPurgeTasks()
         {
             ThreadPoolExecutor executor = new ThreadPoolExecutor();
             Assert.IsNotNull(executor);
@@ -190,13 +208,49 @@ namespace Apache.NMS.ActiveMQ.Test
                 executor.QueueUserWorkItem(TaskThatIncrementsCount);
             }
 
+            executor.Shutdown();
+
             Thread.Sleep(100);
 
-            executor.Shutdown();
-            Assert.AreEqual(0, count);
+			JoinPool(executor);
+
+            Assert.AreEqual(JOB_COUNT, count);
             Assert.IsTrue(executor.IsShutdown);
         }
 
+		[Test]
+	    public void TestIsTerminated() 
+		{
+            ThreadPoolExecutor executor = new ThreadPoolExecutor();
+            Assert.IsNotNull(executor);
+            Assert.IsFalse(executor.IsShutdown);
+            Assert.IsFalse(executor.IsTerminated);
+
+            executor.QueueUserWorkItem(TaskThatSleeps);
+            executor.Shutdown();
+
+			JoinPool(executor);
+            Assert.IsTrue(executor.IsTerminated);
+		}
+
+		[Test]
+	    public void TestAwaitTermination() 
+		{
+            ThreadPoolExecutor executor = new ThreadPoolExecutor();
+            Assert.IsNotNull(executor);
+            Assert.IsFalse(executor.IsShutdown);
+            Assert.IsFalse(executor.IsTerminated);
+
+            executor.QueueUserWorkItem(TaskThatSleeps);
+            executor.Shutdown();
+
+            Assert.IsFalse(executor.IsTerminated, "Terminated before await.");
+			Assert.IsFalse(executor.AwaitTermination(TimeSpan.FromMilliseconds(500)), "Should be terminated
yet.");
+            Assert.IsFalse(executor.IsTerminated, "Terminated after await.");
+
+			JoinPool(executor);
+            Assert.IsTrue(executor.IsTerminated);
+		}
     }
 }
 



Mime
View raw message