activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r829386 [2/2] - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/ main/csharp/Commands/ main/csharp/Threads/ main/csharp/Transport/Failover/ main/csharp/Transport/Mock/ main/csharp/Util/ test/csharp/Threads/
Date Sat, 24 Oct 2009 15:23:54 GMT
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DefaultThreadPools.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DefaultThreadPools.cs?rev=829386&r1=829385&r2=829386&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DefaultThreadPools.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DefaultThreadPools.cs
Sat Oct 24 15:23:52 2009
@@ -19,30 +19,6 @@
 {
 	public class DefaultThreadPools
 	{
-		/*
-		 * Java's execution model is different enough that I have left out
-		 * the Executure concept in this implementation. This must be
-		 * reviewed to see what is appropriate for the future.
-		 * -Allan Schrum
-		private static Executor DEFAULT_POOL = null;
-		static {
-		DEFAULT_POOL = new ScheduledThreadPoolExecutor(5, new ThreadFactory()
-						{
-							public Thread newThread(Runnable runnable)
-							{
-								Thread thread = new Thread(runnable, "ActiveMQ Default Thread Pool Thread");
-								thread.setDaemon(true);
-								return thread;
-							}
-						});
-		}    
-
-		public static Executor DefaultPool
-		{
-			get { return DEFAULT_POOL; }
-		}
-		*/
-
 		private static TaskRunnerFactory DEFAULT_TASK_RUNNER_FACTORY = new TaskRunnerFactory();
 
 		private DefaultThreadPools()

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/PooledTaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/PooledTaskRunner.cs?rev=829386&r1=829385&r2=829386&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/PooledTaskRunner.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/PooledTaskRunner.cs
Sat Oct 24 15:23:52 2009
@@ -30,13 +30,13 @@
 		private bool iterating;
 		private volatile System.Threading.Thread runningThread;
 
-		public void run(Object o)
+		public void Run(Object o)
 		{
 			PooledTaskRunner p = o as PooledTaskRunner;
 			p.runningThread = System.Threading.Thread.CurrentThread;
 			try
 			{
-				p.runTask();
+				p.RunTask();
 			}
 			finally
 			{
@@ -51,13 +51,13 @@
 			this._shutdown = false;
 			this.iterating = false;
 			this.queued = true;
-			ThreadPool.QueueUserWorkItem(new WaitCallback(run), this);
+			ThreadPool.QueueUserWorkItem(new WaitCallback(Run), this);
 		}
 
 		/// <summary>
 		/// We Expect MANY wakeup calls on the same TaskRunner.
 		/// </summary>
-		public void wakeup()
+		public void Wakeup()
 		{
 			lock(runable)
 			{
@@ -82,7 +82,7 @@
 				// iterating.
 				if(!iterating)
 				{
-					ThreadPool.QueueUserWorkItem(new WaitCallback(run), this);
+					ThreadPool.QueueUserWorkItem(new WaitCallback(Run), this);
 				}
 			}
 		}
@@ -91,7 +91,7 @@
 		/// shut down the task
 		/// </summary>
 		/// <param name="timeout"></param>
-		public void shutdown(int timeout)
+		public void Shutdown(TimeSpan timeout)
 		{
 			lock(runable)
 			{
@@ -110,14 +110,13 @@
 			}
 		}
 
-		public void shutdown()
+		public void Shutdown()
 		{
-			shutdown(0);
+			Shutdown(new TimeSpan(Timeout.Infinite));
 		}
 
-		void runTask()
+		internal void RunTask()
 		{
-
 			lock(runable)
 			{
 				queued = false;
@@ -136,7 +135,7 @@
 			{
 				for(int i = 0; i < maxIterationsPerRun; i++)
 				{
-					if(!task.iterate())
+					if(!task.Iterate())
 					{
 						done = true;
 						break;
@@ -163,7 +162,7 @@
 
 						if(queued)
 						{
-							ThreadPool.QueueUserWorkItem(new WaitCallback(run), this);
+							ThreadPool.QueueUserWorkItem(new WaitCallback(Run), this);
 						}
 					}
 				}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/Task.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/Task.cs?rev=829386&r1=829385&r2=829386&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/Task.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/Task.cs Sat
Oct 24 15:23:52 2009
@@ -22,6 +22,6 @@
 	/// </summary>
 	public interface Task
 	{
-		bool iterate();
+		bool Iterate();
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunner.cs?rev=829386&r1=829385&r2=829386&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunner.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunner.cs
Sat Oct 24 15:23:52 2009
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 
+using System;
 
 namespace Apache.NMS.ActiveMQ.Threads
 {
@@ -23,8 +24,8 @@
 	/// </summary>
 	public interface TaskRunner
 	{
-		void wakeup();
-		void shutdown();
-		void shutdown(int timeout);
+		void Wakeup();
+		void Shutdown();
+		void Shutdown(TimeSpan timeout);
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs?rev=829386&r1=829385&r2=829386&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs
Sat Oct 24 15:23:52 2009
@@ -36,20 +36,20 @@
 
 		public TaskRunnerFactory()
 		{
-			initTaskRunnerFactory("ActiveMQ Task", ThreadPriority.Normal, true, 1000, false);
+			InitTaskRunnerFactory("ActiveMQ Task", ThreadPriority.Normal, true, 1000, false);
 		}
 
 		public TaskRunnerFactory(String name, ThreadPriority priority, bool daemon, int maxIterationsPerRun)
 		{
-			initTaskRunnerFactory(name, priority, daemon, maxIterationsPerRun, false);
+			InitTaskRunnerFactory(name, priority, daemon, maxIterationsPerRun, false);
 		}
 
 		public TaskRunnerFactory(String name, ThreadPriority priority, bool daemon, int maxIterationsPerRun,
bool dedicatedTaskRunner)
 		{
-			initTaskRunnerFactory(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner);
+			InitTaskRunnerFactory(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner);
 		}
 
-		public void initTaskRunnerFactory(String name, ThreadPriority priority, bool daemon, int
maxIterationsPerRun, bool dedicatedTaskRunner)
+		public void InitTaskRunnerFactory(String name, ThreadPriority priority, bool daemon, int
maxIterationsPerRun, bool dedicatedTaskRunner)
 		{
 			this.name = name;
 			this.priority = priority;
@@ -60,7 +60,7 @@
 			// using a thread pool to run tasks and use a DedicatedTaskRunner instead.
 		}
 
-		public void shutdown()
+		public void Shutdown()
 		{
 		}
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs?rev=829386&r1=829385&r2=829386&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
Sat Oct 24 15:23:52 2009
@@ -39,6 +39,11 @@
 		{
             this.session = session;
         }
+
+        public bool InTransaction
+        {
+            get{ return this.transactionId != null; }
+        }
         
         public TransactionId TransactionId
         {
@@ -53,67 +58,100 @@
             synchronizations.Add(synchronization);
         }
         
+        public void RemoveSynchronization(ISynchronization synchronization)
+        {
+            synchronizations.Remove(synchronization);
+        }
         
         public void Begin()
         {
-            if (transactionId == null)
+            if(!InTransaction)
             {
-                transactionId = session.Connection.CreateLocalTransactionId();
+                this.transactionId = this.session.Connection.CreateLocalTransactionId();
                 
                 TransactionInfo info = new TransactionInfo();
-                info.ConnectionId = session.Connection.ConnectionId;
+                info.ConnectionId = this.session.Connection.ConnectionId;
                 info.TransactionId = transactionId;
                 info.Type = (int) TransactionType.Begin;
-                info.ResponseRequired = false;
-                session.Connection.Oneway(info);
+                
+                this.session.Connection.Oneway(info);
             }
         }
         
-        
         public void Rollback()
         {
-            if (transactionId != null)
+            if(!InTransaction)
             {
-                TransactionInfo info = new TransactionInfo();
-                info.ConnectionId = session.Connection.ConnectionId;
-                info.TransactionId = transactionId;
-                info.Type = (int) TransactionType.Rollback;
-                info.ResponseRequired = false;
-                transactionId = null;
-                session.Connection.SyncRequest(info);
+                throw new NMSException("Invliad State: Not Currently in a Transaction");
             }
+
+            this.BeforeEnd();
+
+            TransactionInfo info = new TransactionInfo();
+            info.ConnectionId = this.session.Connection.ConnectionId;
+            info.TransactionId = transactionId;
+            info.Type = (int) TransactionType.Rollback;
             
-            foreach (ISynchronization synchronization in synchronizations)
-			{
-                synchronization.AfterRollback();
-            }
-            synchronizations.Clear();
+            this.transactionId = null;
+            this.session.Connection.SyncRequest(info);
+
+            this.AfterRollback();
+            this.synchronizations.Clear();
         }
         
         public void Commit()
         {
-            foreach (ISynchronization synchronization in synchronizations)
-			{
-                synchronization.BeforeCommit();
+            if(!InTransaction)
+            {
+                throw new NMSException("Invliad State: Not Currently in a Transaction");
             }
+
+            this.BeforeEnd();
+            
+            TransactionInfo info = new TransactionInfo();
+            info.ConnectionId = this.session.Connection.ConnectionId;
+            info.TransactionId = transactionId;
+            info.Type = (int) TransactionType.CommitOnePhase;
             
-            if (transactionId != null)
+            this.transactionId = null;
+            this.session.Connection.SyncRequest(info);
+            
+            this.AfterCommit();
+            this.synchronizations.Clear();
+        }
+
+        internal void BeforeEnd()
+        {
+            lock(this.synchronizations.SyncRoot)
             {
-                TransactionInfo info = new TransactionInfo();
-                info.ConnectionId = session.Connection.ConnectionId;
-                info.TransactionId = transactionId;
-                info.Type = (int) TransactionType.CommitOnePhase;
-                info.ResponseRequired = false;
-                transactionId = null;
-                session.Connection.SyncRequest(info);
+                foreach(ISynchronization synchronization in this.synchronizations)
+                {
+                    synchronization.BeforeEnd();
+                }
             }
-            
-            foreach (ISynchronization synchronization in synchronizations)
-			{
-                synchronization.AfterCommit();
+        }
+
+        internal void AfterCommit()
+        {
+            lock(this.synchronizations.SyncRoot)
+            {
+                foreach(ISynchronization synchronization in this.synchronizations)
+                {
+                    synchronization.AfterCommit();
+                }
             }
-            synchronizations.Clear();
         }
+
+        internal void AfterRollback()
+        {
+            lock(this.synchronizations.SyncRoot)
+            {
+                foreach(ISynchronization synchronization in this.synchronizations)
+                {
+                    synchronization.AfterRollback();
+                }
+            }
+        }        
     }
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=829386&r1=829385&r2=829386&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
Sat Oct 24 15:23:52 2009
@@ -100,7 +100,7 @@
                 parent = p;
             }
 
-            public bool iterate()
+            public bool Iterate()
             {
                 bool result = false;
                 bool buildBackup = true;
@@ -129,7 +129,7 @@
                     result = true;
                     try
                     {
-                        parent.reconnectTask.wakeup();
+                        parent.reconnectTask.Wakeup();
                     }
                     catch(ThreadInterruptedException)
                     {
@@ -365,7 +365,7 @@
                     connected = false;
                     if(reconnectOk)
                     {
-                        reconnectTask.wakeup();
+                        reconnectTask.Wakeup();
                     }
                 }
 
@@ -440,7 +440,7 @@
 
             if(reconnectTask != null)
             {
-                reconnectTask.shutdown();
+                reconnectTask.Shutdown();
             }
 
             if(transportToStop != null)
@@ -636,6 +636,7 @@
                     catch(Exception e)
                     {
                         Tracer.DebugFormat("Send Oneway attempt: {0} failed: Message = {1}",
i, e.Message);
+                        Tracer.DebugFormat("Failed Message Was: {0}", command);
                         HandleTransportFailure(e);
                     }
                 }
@@ -721,7 +722,7 @@
                     Tracer.Debug("Waking up reconnect task");
                     try
                     {
-                        reconnectTask.wakeup();
+                        reconnectTask.Wakeup();
                     }
                     catch(ThreadInterruptedException)
                     {

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs?rev=829386&r1=829385&r2=829386&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
Sat Oct 24 15:23:52 2009
@@ -70,7 +70,7 @@
                 this.parent = parent;
             }
             
-            public bool iterate()
+            public bool Iterate()
             {   
                 Command command = null;
                 
@@ -173,7 +173,7 @@
                 }
             }
             
-            this.asyncResponseTask.wakeup();
+            this.asyncResponseTask.Wakeup();
             
             // Send the Command to the Outgoing Command Snoop Hook.
             if( this.OutgoingCommand != null ) {
@@ -240,7 +240,7 @@
                 this.receiveQueue.Enqueue(command);
             }
             
-            this.asyncResponseTask.wakeup();
+            this.asyncResponseTask.Wakeup();
         }
 
         public Object Narrow(Type type)

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs?rev=829386&r1=829385&r2=829386&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
Sat Oct 24 15:23:52 2009
@@ -37,6 +37,11 @@
         }
 
         #region Properties
+
+        public object SyncRoot
+        {
+            get{ return this.mutex; }
+        }
         
         public bool Closed
         {
@@ -129,13 +134,13 @@
 
         public MessageDispatch Dequeue(TimeSpan timeout)
         {
+            Tracer.Debug("Dequeuing message or return null after timeout: " + timeout.ToString());
             lock(this.mutex)
             {
                 // Wait until the channel is ready to deliver messages.
-                while( timeout != TimeSpan.Zero && !Closed && ( Empty ||
!Running ) )
+                if( timeout != TimeSpan.Zero && !Closed && ( Empty || !Running
) )
                 {
                     Monitor.Wait(this.mutex, timeout);
-                    break;
                 }
         
                 if( Closed || !Running || Empty ) 

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/DedicatedTaskRunnerTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/DedicatedTaskRunnerTest.cs?rev=829386&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/DedicatedTaskRunnerTest.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/DedicatedTaskRunnerTest.cs
Sat Oct 24 15:23:52 2009
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using NUnit.Framework;
+using Apache.NMS.ActiveMQ.Threads;
+
+namespace Apache.NMS.ActiveMQ.Test.Threads
+{
+    [TestFixture]
+    public class DedicatedTaskRunnerTest
+    {
+
+        class SimpleCountingTask : Task 
+        {        
+            private uint count;
+                
+            public SimpleCountingTask() 
+            {
+                this.count = 0;
+            }
+        
+            public bool Iterate() 
+            {        
+                count++;
+                return false;
+            }
+        
+            public uint Count
+            { 
+                get{ return count; }
+            }
+        }
+        
+        class InfiniteCountingTask : Task 
+        {
+            private uint count;
+
+            public InfiniteCountingTask()
+            {
+                this.count = 0;
+            }
+
+            public bool Iterate() 
+            {        
+                count++;
+                return true;
+            }
+
+            public uint Count
+            { 
+                get{ return count; }
+            }
+        }
+
+        [Test]
+        public void TestSimple() 
+        {
+            try
+            {
+                new DedicatedTaskRunner(null);
+                Assert.Fail("Should throw a NullReferenceException");
+            }
+            catch
+            {
+            }
+        
+            SimpleCountingTask simpleTask = new SimpleCountingTask();            
+            Assert.IsTrue( simpleTask.Count == 0 );
+            DedicatedTaskRunner simpleTaskRunner = new DedicatedTaskRunner(simpleTask);
+
+            simpleTaskRunner.Wakeup();
+            Thread.Sleep( 250 );
+            Assert.IsTrue( simpleTask.Count >= 1 );
+            simpleTaskRunner.Wakeup();
+            Thread.Sleep( 250 );
+            Assert.IsTrue( simpleTask.Count >= 2 );
+        
+            InfiniteCountingTask infiniteTask = new InfiniteCountingTask();
+            Assert.IsTrue( infiniteTask.Count == 0 );
+            DedicatedTaskRunner infiniteTaskRunner = new DedicatedTaskRunner(infiniteTask);
+            Thread.Sleep( 250 );
+            Assert.IsTrue( infiniteTask.Count != 0 );
+            infiniteTaskRunner.Shutdown();
+            uint count = infiniteTask.Count;
+            Thread.Sleep( 250 );
+            Assert.IsTrue( infiniteTask.Count == count );
+        
+        }
+
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/DedicatedTaskRunnerTest.cs
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message