activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1063485 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/Connection.cs main/csharp/Threads/ThreadPoolExecutor.cs test/csharp/Threads/ThreadPoolExecutorTest.cs
Date Tue, 25 Jan 2011 22:10:08 GMT
Author: tabish
Date: Tue Jan 25 22:10:08 2011
New Revision: 1063485

URL: http://svn.apache.org/viewvc?rev=1063485&view=rev
Log:
https://issues.apache.org/jira/browse/AMQNET-290

Adds new class ThreadPoolExecutor that is used internally by the Connection class to serialize
the execution of its asynchronous exception callbacks and to garuntee that the async exception
callbacks don't outlive the connection once its been closed (they were in the ThreadPool before
without any way to purge them).

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs
  (with props)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/ThreadPoolExecutorTest.cs
  (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=1063485&r1=1063484&r2=1063485&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Tue Jan
25 22:10:08 2011
@@ -20,6 +20,7 @@ using System.Diagnostics;
 using System.Collections;
 using System.Threading;
 using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.Threads;
 using Apache.NMS.ActiveMQ.Transport;
 using Apache.NMS.ActiveMQ.Transport.Failover;
 using Apache.NMS.ActiveMQ.Util;
@@ -74,8 +75,8 @@ namespace Apache.NMS.ActiveMQ
         private ICompressionPolicy compressionPolicy = new CompressionPolicy();
         private readonly IdGenerator clientIdGenerator;
         private volatile CountDownLatch transportInterruptionProcessingComplete;
-        private volatile CountDownLatch asyncExceptionHandlerComplete;
         private readonly MessageTransformation messageTransformation;
+        private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
 
         public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
         {
@@ -529,13 +530,6 @@ namespace Apache.NMS.ActiveMQ
                     Tracer.Info("Connection.Close(): Closing Connection Now.");
                     this.closing.Value = true;
 
-                    // Wait for an async exception event to complete
-                    CountDownLatch latch = this.asyncExceptionHandlerComplete;
-                    if (latch != null)
-                    {
-                        latch.await();
-                    }
-
                     lock(sessions.SyncRoot)
                     {
                         foreach(Session session in sessions)
@@ -552,6 +546,8 @@ namespace Apache.NMS.ActiveMQ
                         transport.Oneway(shutdowninfo);
                     }
 
+                    executor.Shutdown();
+
                     Tracer.Info("Disposing of the Transport.");
                     transport.Dispose();
                 }
@@ -851,7 +847,7 @@ namespace Apache.NMS.ActiveMQ
 
                     // Called in another thread so that processing can continue
                     // here, ensures no lock contention.
-                    ThreadPool.QueueUserWorkItem(AsyncCallExceptionListener, e);
+                    executor.QueueUserWorkItem(AsyncCallExceptionListener, e);
                 }
                 else
                 {
@@ -878,11 +874,9 @@ namespace Apache.NMS.ActiveMQ
 
             if(!this.closing.Value && !this.closed.Value)
             {
-                this.asyncExceptionHandlerComplete = new CountDownLatch(1);
-
                 // Perform the actual work in another thread to avoid lock contention
                 // and allow the caller to continue on in its error cleanup.
-                ThreadPool.QueueUserWorkItem(AsyncOnExceptionHandler, error);
+                executor.QueueUserWorkItem(AsyncOnExceptionHandler, error);
             }
         }
 
@@ -922,8 +916,6 @@ namespace Apache.NMS.ActiveMQ
                     Tracer.Debug("Caught Exception While disposing of Sessions: " + ex);
                 }
             }
-
-            this.asyncExceptionHandlerComplete.countDown();
         }
 
         private void MarkTransportFailed(Exception error)

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs?rev=1063485&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs
Tue Jan 25 22:10:08 2011
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+
+namespace Apache.NMS.ActiveMQ.Threads
+{
+    /// <summary>
+    /// This class provides a wrapper around the ThreadPool mechanism in .NET
+    /// to allow for serial execution of jobs in the ThreadPool and provide
+    /// a means of shutting down the execution of jobs in a deterministic
+    /// way.
+    /// </summary>
+    public class ThreadPoolExecutor
+    {
+        private Queue<Future> workQueue = new Queue<Future>();
+        private Mutex syncRoot = new Mutex();
+        private bool running = false;
+        private bool closing = false;
+        private bool closed = false;
+        private ManualResetEvent executionComplete = new ManualResetEvent(true);
+
+        /// <summary>
+        /// Represents an asynchronous task that is executed on the ThreadPool
+        /// at some point in the future.
+        /// </summary>
+        internal class Future
+        {
+            private WaitCallback callback;
+            private object callbackArg;
+
+            public Future(WaitCallback callback, object arg)
+            {
+                this.callback = callback;
+                this.callbackArg = arg;
+            }
+
+            public void Run()
+            {
+                if(this.callback == null)
+                {
+                    throw new Exception("Future executed with null WaitCallback");
+                }
+
+                this.callback(callbackArg);
+            }
+        }
+
+        public void QueueUserWorkItem(WaitCallback worker)
+        {
+            this.QueueUserWorkItem(worker, null);
+        }
+
+        public void QueueUserWorkItem(WaitCallback worker, object arg)
+        {
+            if(worker == null)
+            {
+                throw new ArgumentNullException("Invalid WaitCallback passed");
+            }
+
+            if(!this.closed)
+            {
+                lock(syncRoot)
+                {
+                    if(!this.closed || !this.closing)
+                    {
+                        this.workQueue.Enqueue(new Future(worker, arg));
+
+                        if(!this.running)
+                        {
+                            this.executionComplete.Reset();
+                            this.running = true;
+                            ThreadPool.QueueUserWorkItem(new WaitCallback(QueueProcessor),
null);
+                        }
+                    }
+                }
+            }
+        }
+
+        public bool IsShutdown
+        {
+            get { return this.closed; }
+        }
+
+        public void Shutdown()
+        {
+            if(!this.closed)
+            {
+                syncRoot.WaitOne();
+
+                if(!this.closed)
+                {
+                    this.closing = true;
+                    this.workQueue.Clear();
+
+                    if(this.running)
+                    {
+                        syncRoot.ReleaseMutex();
+                        this.executionComplete.WaitOne();
+                        syncRoot.WaitOne();
+                    }
+
+                    this.closed = true;
+                }
+
+                syncRoot.ReleaseMutex();
+            }
+        }
+
+        private void QueueProcessor(object unused)
+        {
+            Future theTask = null;
+
+            lock(syncRoot)
+            {
+                if(this.workQueue.Count == 0 || this.closing)
+                {
+                    this.running = false;
+                    this.executionComplete.Set();
+                    return;
+                }
+
+                theTask = this.workQueue.Dequeue();
+            }
+
+            try
+            {
+                theTask.Run();
+            }
+            finally
+            {
+                if(this.closing)
+                {
+                    this.running = false;
+                    this.executionComplete.Set();
+                }
+                else
+                {
+                    ThreadPool.QueueUserWorkItem(new WaitCallback(QueueProcessor), null);
+                }
+            }
+        }
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/ThreadPoolExecutorTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/ThreadPoolExecutorTest.cs?rev=1063485&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/ThreadPoolExecutorTest.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/ThreadPoolExecutorTest.cs
Tue Jan 25 22:10:08 2011
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Apache.NMS.Util;
+using Apache.NMS.ActiveMQ.Threads;
+using NUnit.Framework;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+    [TestFixture]
+    public class ThreadPoolExecutorTest
+    {
+        private const int JOB_COUNT = 100;
+        private ManualResetEvent complete = new ManualResetEvent(false);
+        private bool waitingTaskCompleted = false;
+        private CountDownLatch doneLatch;
+        private int count = 0;
+
+        internal class DummyClass
+        {
+            private int data;
+
+            public DummyClass(int data)
+            {
+                this.data = data;
+            }
+
+            public int Data
+            {
+                get { return data; }
+            }
+        }
+
+        public ThreadPoolExecutorTest()
+        {
+        }
+
+        private void TaskThatSignalsWhenItsComplete(object unused)
+        {
+            waitingTaskCompleted = true;
+            complete.Set();
+        }
+
+        private void TaskThatCountsDown(object unused)
+        {
+            doneLatch.countDown();
+        }
+
+        private void TaskThatSleeps(object unused)
+        {
+            Thread.Sleep(5000);
+        }
+
+        private void TaskThatIncrementsCount(object unused)
+        {
+            count++;
+        }
+
+        private void TaskThatThrowsAnException(object unused)
+        {
+            throw new Exception("Throwing an Exception just because");
+        }
+
+        private void TaskThatValidatesTheArg(object arg)
+        {
+            DummyClass state = arg as DummyClass;
+            if(state != null && state.Data == 10 )
+            {
+                waitingTaskCompleted = true;
+            }
+            complete.Set();
+        }
+
+        [SetUp]
+        public void SetUp()
+        {
+            this.complete.Reset();
+            this.waitingTaskCompleted = false;
+            this.doneLatch = new CountDownLatch(JOB_COUNT);
+            this.count = 0;
+        }
+
+        [Test]
+        public void TestConstructor()
+        {
+            ThreadPoolExecutor executor = new ThreadPoolExecutor();
+            Assert.IsNotNull(executor);
+            Assert.IsFalse(executor.IsShutdown);
+            executor.Shutdown();
+            Assert.IsTrue(executor.IsShutdown);
+        }
+
+        [Test]
+        public void TestSingleTaskExecuted()
+        {
+            ThreadPoolExecutor executor = new ThreadPoolExecutor();
+            Assert.IsNotNull(executor);
+            Assert.IsFalse(executor.IsShutdown);
+
+            executor.QueueUserWorkItem(TaskThatSignalsWhenItsComplete);
+
+            this.complete.WaitOne();
+            Assert.IsTrue(this.waitingTaskCompleted);
+
+            executor.Shutdown();
+            Assert.IsTrue(executor.IsShutdown);
+        }
+
+        [Test]
+        public void TestTaskParamIsPropagated()
+        {
+            ThreadPoolExecutor executor = new ThreadPoolExecutor();
+            Assert.IsNotNull(executor);
+            Assert.IsFalse(executor.IsShutdown);
+
+            executor.QueueUserWorkItem(TaskThatValidatesTheArg, new DummyClass(10));
+
+            this.complete.WaitOne();
+            Assert.IsTrue(this.waitingTaskCompleted);
+
+            executor.Shutdown();
+            Assert.IsTrue(executor.IsShutdown);
+        }
+
+        [Test]
+        public void TestAllTasksComplete()
+        {
+            ThreadPoolExecutor executor = new ThreadPoolExecutor();
+            Assert.IsNotNull(executor);
+            Assert.IsFalse(executor.IsShutdown);
+
+            for(int i = 0; i < JOB_COUNT; ++i)
+            {
+                executor.QueueUserWorkItem(TaskThatCountsDown);
+            }
+
+            Assert.IsTrue(this.doneLatch.await(TimeSpan.FromMilliseconds(30 * 1000)));
+
+            executor.Shutdown();
+            Assert.IsTrue(executor.IsShutdown);
+        }
+
+        [Test]
+        public void TestAllTasksCompleteAfterException()
+        {
+            ThreadPoolExecutor executor = new ThreadPoolExecutor();
+            Assert.IsNotNull(executor);
+            Assert.IsFalse(executor.IsShutdown);
+
+            executor.QueueUserWorkItem(TaskThatThrowsAnException);
+
+            for(int i = 0; i < JOB_COUNT; ++i)
+            {
+                executor.QueueUserWorkItem(TaskThatCountsDown);
+            }
+
+            Assert.IsTrue(this.doneLatch.await(TimeSpan.FromMilliseconds(30 * 1000)));
+
+            executor.Shutdown();
+            Assert.IsTrue(executor.IsShutdown);
+        }
+
+        [Test]
+        public void TestThatShutdownPurgesTasks()
+        {
+            ThreadPoolExecutor executor = new ThreadPoolExecutor();
+            Assert.IsNotNull(executor);
+            Assert.IsFalse(executor.IsShutdown);
+
+            executor.QueueUserWorkItem(TaskThatSleeps);
+
+            for(int i = 0; i < JOB_COUNT; ++i)
+            {
+                executor.QueueUserWorkItem(TaskThatIncrementsCount);
+            }
+
+            Thread.Sleep(100);
+
+            executor.Shutdown();
+            Assert.AreEqual(0, count);
+            Assert.IsTrue(executor.IsShutdown);
+        }
+
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/ThreadPoolExecutorTest.cs
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message