activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1475989 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/Connection.cs main/csharp/MessageConsumer.cs main/csharp/Session.cs test/csharp/Threads/SchedulerTest.cs
Date Thu, 25 Apr 2013 22:23:17 GMT
Author: tabish
Date: Thu Apr 25 22:23:16 2013
New Revision: 1475989

URL: http://svn.apache.org/r1475989
Log:
Wire in the Scheduler, connection tasks now pinned to a single thread in the Scheduler.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/SchedulerTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=1475989&r1=1475988&r2=1475989&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Thu Apr
25 22:23:16 2013
@@ -93,6 +93,7 @@ namespace Apache.NMS.ActiveMQ
 		private readonly MessageTransformation messageTransformation;
 		private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
 		private AdvisoryConsumer advisoryConsumer = null;
+		private Scheduler scheduler = null;
 		private readonly ConnectionAudit connectionAudit = new ConnectionAudit();
 
 		public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
@@ -483,6 +484,36 @@ namespace Apache.NMS.ActiveMQ
 			get { return this.messageTransformation; }
 		}
 
+	    internal Scheduler Scheduler
+		{
+			get
+			{
+		        Scheduler result = this.scheduler;
+		        if (result == null) 
+				{
+		            lock (this) 
+					{
+		                result = scheduler;
+		                if (result == null) 
+						{
+		                    CheckClosed();
+		                    try 
+							{
+		                        result = scheduler = new Scheduler(
+									"ActiveMQConnection["+this.info.ConnectionId.Value+"] Scheduler");
+		                        scheduler.Start();
+		                    }
+							catch(Exception e)
+							{
+		                        throw NMSExceptionSupport.Create(e);
+		                    }
+		                }
+		            }
+		        }
+		        return result;
+			}
+	    }
+
 		#endregion
 
 		private void SetTransport(ITransport newTransport)
@@ -651,6 +682,19 @@ namespace Apache.NMS.ActiveMQ
 						this.advisoryConsumer = null;
 					}
 
+                    Scheduler scheduler = this.scheduler;
+                    if (scheduler != null) 
+					{
+                        try 
+						{
+                            scheduler.Stop();
+                        } 
+						catch (Exception e) 
+						{
+                            throw NMSExceptionSupport.Create(e);
+                        }
+                    }
+
 					lock(sessions.SyncRoot)
 					{
 						foreach(Session session in sessions)

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1475989&r1=1475988&r2=1475989&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Thu Apr 25 22:23:16 2013
@@ -65,7 +65,7 @@ namespace Apache.NMS.ActiveMQ
 		private DateTime optimizeAckTimestamp = DateTime.Now;
 	    private long optimizeAcknowledgeTimeOut = 0;
 	    private long optimizedAckScheduledAckInterval = 0;
-	    private Timer optimizedAckTimer;
+	    private WaitCallback optimizedAckTask = null;
 	    private long failoverRedeliveryWaitPeriod = 0;
 	    private bool transactedIndividualAck = false;
 	    private bool nonBlockingRedelivery = false;
@@ -251,26 +251,18 @@ namespace Apache.NMS.ActiveMQ
 			{ 
 				this.optimizedAckScheduledAckInterval = value; 
 
-		        if (this.optimizedAckTimer != null) 
+		        if (this.optimizedAckTask != null) 
 				{
-					AutoResetEvent shutdownEvent = new AutoResetEvent(false);
-					this.optimizedAckTimer.Dispose(shutdownEvent);
-					if(!shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(5000), false))
-					{
-						Tracer.WarnFormat("Consumer[{0}]: Optimized Ack Timer Task didn't shutdown properly.",
this.info.ConsumerId);
-					}
-
-					this.optimizedAckTimer = null;
+					this.session.Scheduler.Cancel(this.optimizedAckTask);
+					this.optimizedAckTask = null;
 		        }
 
 		        // Should we periodically send out all outstanding acks.
 		        if (this.optimizeAcknowledge && this.optimizedAckScheduledAckInterval >
0)
 				{
-					this.optimizedAckTimer = new Timer(
-						new TimerCallback(DoOptimizedAck),
-						null,
-						optimizedAckScheduledAckInterval,
-						optimizedAckScheduledAckInterval);
+					this.optimizedAckTask = new WaitCallback(DoOptimizedAck);
+					this.session.Scheduler.ExecutePeriodically(
+						optimizedAckTask, null, TimeSpan.FromMilliseconds(optimizedAckScheduledAckInterval));
 				}
 			}
 		}
@@ -507,9 +499,9 @@ namespace Apache.NMS.ActiveMQ
 					this.executor.AwaitTermination(TimeSpan.FromMinutes(1));
 					this.executor = null;
 	            }
-				if (this.optimizedAckTimer != null)
+				if (this.optimizedAckTask != null)
 				{
-					this.OptimizedAckScheduledAckInterval = 0;
+					this.session.Scheduler.Cancel(this.optimizedAckTask);
 				}
 
 	            if (this.session.IsClientAcknowledge)
@@ -1579,7 +1571,10 @@ namespace Apache.NMS.ActiveMQ
 
 	    private void DoOptimizedAck(object state)
 		{
-			DeliverAcks();
+			if (this.optimizeAcknowledge && !this.unconsumedMessages.Closed)
+			{
+				DeliverAcks();
+			}
 		}
 	    
 	    private void WaitForRedeliveries() 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=1475989&r1=1475988&r2=1475989&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Thu Apr
25 22:23:16 2013
@@ -22,6 +22,7 @@ using System.Threading;
 using Apache.NMS.Util;
 using Apache.NMS.ActiveMQ.Commands;
 using Apache.NMS.ActiveMQ.Util;
+using Apache.NMS.ActiveMQ.Threads;
 
 namespace Apache.NMS.ActiveMQ
 {
@@ -281,6 +282,11 @@ namespace Apache.NMS.ActiveMQ
             set { this.producerTransformer = value; }
         }
 
+		internal Scheduler Scheduler
+		{
+			get { return this.connection.Scheduler; }
+		}
+
         #endregion
 
         #region ISession Members
@@ -912,13 +918,15 @@ namespace Apache.NMS.ActiveMQ
             // Because we are called from inside the Transport Reconnection logic
             // we spawn the Consumer clear to another Thread so that we can avoid
             // any lock contention that might exist between the consumer and the
-            // connection that is reconnecting.
+            // connection that is reconnecting.  Use the Connection Scheduler so 
+			// that the clear calls are done one at a time to avoid further 
+			// contention on the Connection and Session resources.
             lock(this.consumers.SyncRoot)
             {
                 foreach(MessageConsumer consumer in this.consumers.Values)
                 {
                     consumer.InProgressClearRequired();
-                    ThreadPool.QueueUserWorkItem(ClearMessages, consumer);
+					Scheduler.ExecuteAfterDelay(ClearMessages, consumer, 0);
                 }
             }
         }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/SchedulerTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/SchedulerTest.cs?rev=1475989&r1=1475988&r2=1475989&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/SchedulerTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/SchedulerTest.cs
Thu Apr 25 22:23:16 2013
@@ -141,6 +141,19 @@ namespace Apache.NMS.ActiveMQ.Test
 		}
 
 		[Test]
+		public void TestExecuteAfterDelayNoDelay()
+		{
+	        Scheduler scheduler = new Scheduler("TestExecuteAfterDelay");
+	        scheduler.Start();
+	        scheduler.ExecuteAfterDelay(CounterCallback, null, 0);
+	        scheduler.ExecuteAfterDelay(CounterCallback, null, 0);
+	        scheduler.ExecuteAfterDelay(CounterCallback, null, 0);
+	        Thread.Sleep(500);
+	        Assert.IsTrue(counter == 3, "Should have executed Three tasks.");
+	        scheduler.Stop();
+		}
+
+		[Test]
 		public void TestCancel()
 		{
 	        Scheduler scheduler = new Scheduler("TestCancel");



Mime
View raw message