activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jgo...@apache.org
Subject svn commit: r808244 - /activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs
Date Thu, 27 Aug 2009 01:29:34 GMT
Author: jgomes
Date: Thu Aug 27 01:29:34 2009
New Revision: 808244

URL: http://svn.apache.org/viewvc?rev=808244&view=rev
Log:
Refactored async delivery dispatching thread to be interruptable so that it can be cleanly
stopped and started.

Modified:
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs

Modified: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=808244&r1=808243&r2=808244&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs Thu
Aug 27 01:29:34 2009
@@ -32,6 +32,9 @@
 		private readonly AcknowledgementMode acknowledgementMode;
 		private MessageQueue messageQueue;
 		private event MessageListener listener;
+		private int listenerCount = 0;
+		private Thread asyncDeliveryThread = null;
+		private AutoResetEvent pause = new AutoResetEvent(false);
 		private AtomicBoolean asyncDelivery = new AtomicBoolean(false);
 
 		public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, MessageQueue
messageQueue)
@@ -50,30 +53,86 @@
 			add
 			{
 				listener += value;
+				listenerCount++;
 				StartAsyncDelivery();
 			}
+
 			remove
 			{
-				listener -= value;
+				if(listenerCount > 0)
+				{
+					listener -= value;
+					listenerCount--;
+				}
+
+				if(0 == listenerCount)
+				{
+					StopAsyncDelivery();
+				}
 			}
 		}
 
 		public IMessage Receive()
 		{
-			Message message = messageQueue.Receive();
-			return ToNmsMessage(message);
+			IMessage nmsMessage = null;
+
+			if(messageQueue != null)
+			{
+				Message message;
+
+				try
+				{
+					message = messageQueue.Receive(zeroTimeout);
+				}
+				catch
+				{
+					message = null;
+				}
+
+				if(null == message)
+				{
+					ReceiveCompletedEventHandler receiveMsg =
+							delegate(Object source, ReceiveCompletedEventArgs asyncResult) {
+								message = messageQueue.EndReceive(asyncResult.AsyncResult);
+								pause.Set();
+							};
+
+					messageQueue.ReceiveCompleted += receiveMsg;
+					messageQueue.BeginReceive();
+					pause.WaitOne();
+					messageQueue.ReceiveCompleted -= receiveMsg;
+				}
+
+				nmsMessage = ToNmsMessage(message);
+			}
+
+			return nmsMessage;
 		}
 
 		public IMessage Receive(TimeSpan timeout)
 		{
-			Message message = messageQueue.Receive(timeout);
-			return ToNmsMessage(message);
+			IMessage nmsMessage = null;
+
+			if(messageQueue != null)
+			{
+				Message message = messageQueue.Receive(timeout);
+				nmsMessage = ToNmsMessage(message);
+			}
+
+			return nmsMessage;
 		}
 
 		public IMessage ReceiveNoWait()
 		{
-			Message message = messageQueue.Receive(zeroTimeout);
-			return ToNmsMessage(message);
+			IMessage nmsMessage = null;
+
+			if(messageQueue != null)
+			{
+				Message message = messageQueue.Receive(zeroTimeout);
+				nmsMessage = ToNmsMessage(message);
+			}
+
+			return nmsMessage;
 		}
 
 		public void Dispose()
@@ -91,18 +150,34 @@
 			}
 		}
 
-		public void StopAsyncDelivery()
+		protected virtual void StopAsyncDelivery()
 		{
-			asyncDelivery.Value = false;
+			if(asyncDelivery.CompareAndSet(true, false))
+			{
+				if(null != asyncDeliveryThread)
+				{
+					Tracer.Info("Stopping async delivery thread.");
+					pause.Set();
+					if(!asyncDeliveryThread.Join(10000))
+					{
+						Tracer.Info("Aborting async delivery thread.");
+						asyncDeliveryThread.Abort();
+					}
+
+					asyncDeliveryThread = null;
+					Tracer.Info("Async delivery thread stopped.");
+				}
+			}
 		}
 
 		protected virtual void StartAsyncDelivery()
 		{
 			if(asyncDelivery.CompareAndSet(false, true))
 			{
-				Thread thread = new Thread(new ThreadStart(DispatchLoop));
-				thread.IsBackground = true;
-				thread.Start();
+				asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop));
+				asyncDeliveryThread.Name = "Message Consumer Dispatch: " + messageQueue.QueueName;
+				asyncDeliveryThread.IsBackground = true;
+				asyncDeliveryThread.Start();
 			}
 		}
 
@@ -111,18 +186,30 @@
 			Tracer.Info("Starting dispatcher thread consumer: " + this);
 			while(asyncDelivery.Value)
 			{
-				IMessage message = Receive();
-				if(message != null)
+				try
 				{
-					try
-					{
-						listener(message);
-					}
-					catch(Exception e)
+					IMessage message = Receive();
+					if(asyncDelivery.Value && message != null)
 					{
-						HandleAsyncException(e);
+						try
+						{
+							listener(message);
+						}
+						catch(Exception e)
+						{
+							HandleAsyncException(e);
+						}
 					}
 				}
+				catch(ThreadAbortException ex)
+				{
+					Tracer.InfoFormat("Thread abort received in thread: {0} : {1}", this, ex.Message);
+					break;
+				}
+				catch(Exception ex)
+				{
+					Tracer.ErrorFormat("Exception while receiving message in thread: {0} : {1}", this, ex.Message);
+				}
 			}
 			Tracer.Info("Stopping dispatcher thread consumer: " + this);
 		}



Mime
View raw message