activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r829386 [1/2] - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/ main/csharp/Commands/ main/csharp/Threads/ main/csharp/Transport/Failover/ main/csharp/Transport/Mock/ main/csharp/Util/ test/csharp/Threads/
Date Sat, 24 Oct 2009 15:23:54 GMT
Author: tabish
Date: Sat Oct 24 15:23:52 2009
New Revision: 829386

URL: http://svn.apache.org/viewvc?rev=829386&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-176

Refactored Session and Consumer dispatching code with new Message acknowledgment handling to deal with transactions correctly and optimize the overall dispatch / ack cycle.

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/DedicatedTaskRunnerTest.cs   (with props)
Removed:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs
Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/BaseCommand.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ISynchronization.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DefaultThreadPools.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/PooledTaskRunner.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/Task.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunner.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/BaseCommand.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/BaseCommand.cs?rev=829386&r1=829385&r2=829386&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/BaseCommand.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/BaseCommand.cs Sat Oct 24 15:23:52 2009
@@ -33,7 +33,7 @@
     public abstract class BaseCommand : BaseDataStructure, Command, ICloneable
     {
         private int commandId;
-        private bool responseRequired;
+        private bool responseRequired = false;
 
         public int CommandId
         {

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=829386&r1=829385&r2=829386&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Sat Oct 24 15:23:52 2009
@@ -38,6 +38,7 @@
         private WireFormatInfo brokerWireFormatInfo; // from broker
         private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
         private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
+        private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
         private readonly object myLock = new object();
         private bool asyncSend = false;
         private bool alwaysSyncSend = false;
@@ -183,6 +184,57 @@
             get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }
         }
 
+        public Uri BrokerUri
+        {
+            get { return brokerUri; }
+        }
+
+        public ITransport ITransport
+        {
+            get { return transport; }
+            set { this.transport = value; }
+        }
+
+        public TimeSpan RequestTimeout
+        {
+            get { return this.requestTimeout; }
+            set { this.requestTimeout = value; }
+        }
+
+        public AcknowledgementMode AcknowledgementMode
+        {
+            get { return acknowledgementMode; }
+            set { this.acknowledgementMode = value; }
+        }
+
+        public string ClientId
+        {
+            get { return info.ClientId; }
+            set
+            {
+                if(connected)
+                {
+                    throw new NMSException("You cannot change the ClientId once the Connection is connected");
+                }
+                info.ClientId = value;
+            }
+        }
+
+        public ConnectionId ConnectionId
+        {
+            get { return info.ConnectionId; }
+        }
+
+        public BrokerInfo BrokerInfo
+        {
+            get { return brokerInfo; }
+        }
+
+        public WireFormatInfo BrokerWireFormat
+        {
+            get { return brokerWireFormatInfo; }
+        }
+        
         #endregion
 
         /// <summary>
@@ -198,7 +250,7 @@
                 {
                     foreach(Session session in sessions)
                     {
-                        session.StartAsyncDelivery();
+                        session.Start();
                     }
                 }
             }
@@ -226,7 +278,7 @@
                 {
                     foreach(Session session in sessions)
                     {
-                        session.StopAsyncDelivery();
+                        session.Stop();
                     }
                 }
             }
@@ -255,7 +307,7 @@
 
             if(IsStarted)
             {
-                session.StartAsyncDelivery();
+                session.Start();
             }
 
             sessions.Add(session);
@@ -264,14 +316,22 @@
 
         public void RemoveSession(Session session)
         {
-            DisposeOf(session.SessionId);
-
             if(!this.closing)
             {
                 sessions.Remove(session);
             }
         }
 
+        public void addDispatcher( ConsumerId id, IDispatcher dispatcher )
+        {
+            this.dispatchers.Add( id, dispatcher );
+        }
+
+        public void removeDispatcher( ConsumerId id )
+        {
+            this.dispatchers.Remove( id );
+        }
+        
         public void addProducer( ProducerId id, MessageProducer producer )
         {
             this.producers.Add( id, producer );
@@ -299,7 +359,7 @@
                     {
                         foreach(Session session in sessions)
                         {
-                            session.Close();
+                            session.DoClose();
                         }
                     }
                     sessions.Clear();
@@ -308,7 +368,6 @@
                     {
                         DisposeOf(ConnectionId);
                         ShutdownInfo shutdowninfo = new ShutdownInfo();
-                        shutdowninfo.ResponseRequired = false;
                         transport.Oneway(shutdowninfo);
                     }
 
@@ -362,59 +421,6 @@
             disposed = true;
         }
 
-        // Properties
-
-        public Uri BrokerUri
-        {
-            get { return brokerUri; }
-        }
-
-        public ITransport ITransport
-        {
-            get { return transport; }
-            set { this.transport = value; }
-        }
-
-        public TimeSpan RequestTimeout
-        {
-            get { return this.requestTimeout; }
-            set { this.requestTimeout = value; }
-        }
-
-        public AcknowledgementMode AcknowledgementMode
-        {
-            get { return acknowledgementMode; }
-            set { this.acknowledgementMode = value; }
-        }
-
-        public string ClientId
-        {
-            get { return info.ClientId; }
-            set
-            {
-                if(connected)
-                {
-                    throw new NMSException("You cannot change the ClientId once the Connection is connected");
-                }
-                info.ClientId = value;
-            }
-        }
-
-        public ConnectionId ConnectionId
-        {
-            get { return info.ConnectionId; }
-        }
-
-        public BrokerInfo BrokerInfo
-        {
-            get { return brokerInfo; }
-        }
-
-        public WireFormatInfo BrokerWireFormat
-        {
-            get { return brokerWireFormatInfo; }
-        }
-
         // Implementation methods
 
         /// <summary>
@@ -469,7 +475,7 @@
             }
         }
 
-        public void DisposeOf(DataStructure objectId)
+        private void DisposeOf(DataStructure objectId)
         {
             try
             {
@@ -480,7 +486,6 @@
                     Tracer.Info("Asynchronously disposing of Connection.");
                     if(connected)
                     {
-                        command.ResponseRequired = false;
                         transport.Oneway(command);
                     }
                 }
@@ -499,25 +504,6 @@
             }
         }
 
-        /// <summary>
-        /// Creates a new temporary destination name
-        /// </summary>
-        public String CreateTemporaryDestinationName()
-        {
-            return info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter);
-        }
-
-        /// <summary>
-        /// Creates a new local transaction ID
-        /// </summary>
-        public LocalTransactionId CreateLocalTransactionId()
-        {
-            LocalTransactionId id = new LocalTransactionId();
-            id.ConnectionId = ConnectionId;
-            id.Value = Interlocked.Increment(ref localTransactionCounter);
-            return id;
-        }
-
         protected void CheckConnected()
         {
             if(closed)
@@ -607,33 +593,23 @@
 
         protected void DispatchMessage(MessageDispatch dispatch)
         {
-            bool dispatched = false;
-
-            // Override the Message's Destination with the one from the Dispatch since in the
-            // case of a virtual Topic the correct destination ack is the one from the Dispatch.
-            // This is a bit of a hack since we should really be sending the entire dispatch to
-            // the Consumer.
-            dispatch.Message.Destination = dispatch.Destination;
-            dispatch.Message.ReadOnlyBody = true;
-            dispatch.Message.ReadOnlyProperties = true;
-            dispatch.Message.RedeliveryCounter = dispatch.RedeliveryCounter;
-
-            lock(sessions.SyncRoot)
+            lock(dispatchers.SyncRoot)
             {
-                foreach(Session session in sessions)
+                if(dispatchers.Contains(dispatch.ConsumerId))
                 {
-                    if(session.DispatchMessage(dispatch.ConsumerId, dispatch.Message))
-                    {
-                        dispatched = true;
-                        break;
-                    }
+                    IDispatcher dispatcher = (IDispatcher) dispatchers[dispatch.ConsumerId];
+
+                    dispatch.Message.ReadOnlyBody = true;
+                    dispatch.Message.ReadOnlyProperties = true;
+                    dispatch.Message.RedeliveryCounter = dispatch.RedeliveryCounter;
+
+                    dispatcher.Dispatch(dispatch);
+
+                    return;
                 }
             }
 
-            if(!dispatched)
-            {
-                Tracer.Error("No such consumer active: " + dispatch.ConsumerId);
-            }
+            Tracer.Error("No such consumer active: " + dispatch.ConsumerId);
         }
 
         protected void OnKeepAliveCommand(ITransport commandTransport, KeepAliveInfo info)
@@ -677,6 +653,11 @@
         {
             Tracer.Debug("Transport has been Interrupted.");
 
+            foreach(Session session in this.sessions)
+            {
+                session.ClearMessagesInProgress();
+            }
+
             if(this.ConnectionInterruptedListener != null && !this.closing )
             {
                 try
@@ -720,6 +701,25 @@
             }
         }
 
+        /// <summary>
+        /// Creates a new temporary destination name
+        /// </summary>
+        public String CreateTemporaryDestinationName()
+        {
+            return info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter);
+        }
+
+        /// <summary>
+        /// Creates a new local transaction ID
+        /// </summary>
+        public LocalTransactionId CreateLocalTransactionId()
+        {
+            LocalTransactionId id = new LocalTransactionId();
+            id.ConnectionId = ConnectionId;
+            id.Value = Interlocked.Increment(ref localTransactionCounter);
+            return id;
+        }
+        
         protected SessionInfo CreateSessionInfo(AcknowledgementMode sessionAcknowledgementMode)
         {
             SessionInfo answer = new SessionInfo();

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ISynchronization.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ISynchronization.cs?rev=829386&r1=829385&r2=829386&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ISynchronization.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ISynchronization.cs Sat Oct 24 15:23:52 2009
@@ -20,9 +20,9 @@
 	public interface ISynchronization
     {
         /// <summary>
-        /// Called before a commit
+        /// Called before a commit or rollback is applied.
         /// </summary>
-        void BeforeCommit();
+        void BeforeEnd();
         
         /// <summary>
         /// Called after a commit

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=829386&r1=829385&r2=829386&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Sat Oct 24 15:23:52 2009
@@ -15,6 +15,9 @@
  * limitations under the License.
  */
 using System;
+using System.Threading;
+using System.Collections;
+using System.Collections.Generic;
 using Apache.NMS.ActiveMQ.Commands;
 using Apache.NMS;
 using Apache.NMS.Util;
@@ -25,37 +28,50 @@
 	{
 		DeliveredAck = 0, // Message delivered but not consumed
 		PoisonAck = 1, // Message could not be processed due to poison pill but discard anyway
-		ConsumedAck = 2 // Message consumed, discard
+		ConsumedAck = 2, // Message consumed, discard
+        RedeliveredAck = 3, // Message has been Redelivered and is not yet poisoned.
+        IndividualAck = 4 // Only the given message is to be treated as consumed.
 	}
 
-
 	/// <summary>
 	/// An object capable of receiving messages from some destination
 	/// </summary>
-	public class MessageConsumer : IMessageConsumer
+	public class MessageConsumer : IMessageConsumer, IDispatcher
 	{
-		private readonly AcknowledgementMode acknowledgementMode;
-		private bool closed = false;
-		private object closedLock = new object();
-		private readonly Dispatcher dispatcher = new Dispatcher();
-		private readonly ConsumerInfo info;
+        private object closedLock = new object();
+
+        private readonly MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
+        private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
+        private readonly ConsumerInfo info;
+        private Session session;
+
+        private MessageAck pendingAck = null;
+
+        private Atomic<bool> started = new Atomic<bool>();
+        private Atomic<bool> deliveringAcks = new Atomic<bool>();
+
+        private bool closed = false;
 		private int maximumRedeliveryCount = 10;
 		private int redeliveryTimeout = 500;
-		private Session session;
-		private Session ackSession;
 		protected bool disposed = false;
+        private long lastDeliveredSequenceId = 0;
+        private int deliveredCounter = 0;
+        private int additionalWindowSize = 0;
+        private long redeliveryDelay = 0;
+        private int dispatchedCount = 0;
+        private volatile bool synchronizationRegistered = false;
+        private bool clearDispatchList = false;
+
+        private const int DEFAULT_REDELIVERY_DELAY = 0;
+        private const int DEFAULT_MAX_REDELIVERIES = 5;
 
+        private event MessageListener listener;
+        
 		// Constructor internal to prevent clients from creating an instance.
-		internal MessageConsumer(Session session, ConsumerInfo info,
-								 AcknowledgementMode acknowledgementMode)
+		internal MessageConsumer(Session session, ConsumerInfo info)
 		{
 			this.session = session;
 			this.info = info;
-			this.acknowledgementMode = acknowledgementMode;
-			if(AcknowledgementMode.AutoAcknowledge == acknowledgementMode)
-			{
-				this.ackSession = (Session) session.Connection.CreateSession(acknowledgementMode);
-			}
 		}
 
 		~MessageConsumer()
@@ -63,11 +79,13 @@
 			Dispose(false);
 		}
 
-		internal Dispatcher Dispatcher
-		{
-			get { return this.dispatcher; }
-		}
+        #region Property Accessors
 
+        public long LastDeliveredSequenceId
+        {
+            get{ return this.lastDeliveredSequenceId; }
+        }
+        
 		public ConsumerId ConsumerId
 		{
 			get { return info.ConsumerId; }
@@ -85,35 +103,103 @@
 			set { redeliveryTimeout = value; }
 		}
 
+        public int PrefetchSize
+        {
+            get { return this.info.PrefetchSize; }
+        }
+
+        #endregion
+
 		#region IMessageConsumer Members
 
 		public event MessageListener Listener
 		{
 			add
 			{
+                CheckClosed();
+
+                if(this.PrefetchSize == 0)
+                {
+                    throw new NMSException("Cannot set Asynchronous Listener on a Consumer with a zero Prefetch size");
+                }
+
+                bool wasStarted = this.session.Started;
+
+                if(wasStarted == true)
+                {
+                    this.session.Stop();
+                }
+                
 				listener += value;
-				session.RegisterConsumerDispatcher(dispatcher);
+                this.session.Redispatch(this.unconsumedMessages);
+
+                if(wasStarted == true)
+                {
+                    this.session.Start();
+                }
 			}
 			remove { listener -= value; }
 		}
 
-
 		public IMessage Receive()
 		{
-			SendPullRequest(0);
-			return SetupAcknowledge(dispatcher.Dequeue());
-		}
-
-		public IMessage Receive(System.TimeSpan timeout)
-		{
-			SendPullRequest((long) timeout.TotalMilliseconds);
-			return SetupAcknowledge(dispatcher.Dequeue(timeout));
+            CheckClosed();
+            CheckMessageListener();
+    
+            SendPullRequest(0);
+            MessageDispatch dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1));
+            
+            if(dispatch == null)
+            {
+                return null;
+            }
+    
+            BeforeMessageIsConsumed(dispatch);
+            AfterMessageIsConsumed(dispatch, false);
+    
+            return CreateActiveMQMessage(dispatch);
+		}
+
+		public IMessage Receive(TimeSpan timeout)
+		{
+            CheckClosed();
+            CheckMessageListener();
+    
+            SendPullRequest((long)timeout.TotalMilliseconds);
+            MessageDispatch dispatch = this.Dequeue(timeout);
+            
+            if(dispatch == null)
+            {
+                return null;
+            }
+
+            Tracer.Debug("Receive got new MessageDispatch: " + dispatch);
+    
+            BeforeMessageIsConsumed(dispatch);
+            AfterMessageIsConsumed(dispatch, false);
+
+            Tracer.Debug("Creating new message and returning.");
+            
+            return CreateActiveMQMessage(dispatch);
 		}
 
 		public IMessage ReceiveNoWait()
 		{
-			SendPullRequest(-1);
-			return SetupAcknowledge(dispatcher.DequeueNoWait());
+            CheckClosed();
+            CheckMessageListener();
+    
+            SendPullRequest(-1);
+            MessageDispatch dispatch = this.Dequeue(TimeSpan.Zero);
+            
+            if(dispatch == null)
+            {
+                return null;
+            }
+    
+            BeforeMessageIsConsumed(dispatch);
+            AfterMessageIsConsumed(dispatch, false);
+    
+            return CreateActiveMQMessage(dispatch);
 		}
 
 		public void Dispose()
@@ -136,8 +222,8 @@
 
 			try
 			{
-				Close();
-			}
+                Close();
+            }
 			catch
 			{
 				// Ignore network errors.
@@ -148,117 +234,62 @@
 
 		public void Close()
 		{
-			lock(closedLock)
-			{
-				if(closed)
-				{
-					return;
-				}
-
-				try
-				{
-					// wake up any pending dequeue() call on the dispatcher
-					dispatcher.Close();
-					session.DisposeOf(info.ConsumerId);
-
-					if(ackSession != null)
-					{
-						ackSession.Close();
-					}
-				}
-				catch(Exception ex)
-				{
-					Tracer.ErrorFormat("Error during consumer close: {0}", ex);
-				}
-
-				session = null;
-				ackSession = null;
-				closed = true;
-			}
-		}
+            if(!this.unconsumedMessages.Closed)
+            {
+                if(this.session.IsTransacted && this.session.TransactionContext.InTransaction) 
+                {
+                    this.session.TransactionContext.AddSynchronization(new ConsumerCloseSynchronization(this));
+                } 
+                else
+                {
+                    this.DoClose();
+                } 
+            }
+		}
+
+        internal void DoClose()
+        {
+            lock(this.closedLock)
+            {
+                if(!this.unconsumedMessages.Closed)
+                {                    
+                    // Do we have any acks we need to send out before closing?
+                    // Ack any delivered messages now.
+                    if(!this.session.IsTransacted) 
+                    {
+                        DeliverAcks();
+                        if(this.IsAutoAcknowledgeBatch)
+                        {
+                            Acknowledge();
+                        }
+                    }
+                    
+                    if(!this.session.IsTransacted)
+                    {
+                        lock(this.dispatchedMessages)
+                        {
+                            dispatchedMessages.Clear();
+                        }
+                    }
+                    
+                    this.unconsumedMessages.Close();
+                    this.session.DisposeOf(this.info.ConsumerId, this.lastDeliveredSequenceId);
+
+                    RemoveInfo removeCommand = new RemoveInfo();
+                    removeCommand.ObjectId = this.info.ConsumerId;
+                    removeCommand.LastDeliveredSequenceId = this.lastDeliveredSequenceId;
+                    
+                    this.session.Connection.Oneway(removeCommand);
+                    this.session = null;
+                }
+            }            
+        }
 
 		#endregion
 
-		private event MessageListener listener;
-
-		public void RedeliverRolledBackMessages()
-		{
-			dispatcher.RedeliverRolledBackMessages();
-		}
-
-		/// <summary>
-		/// Method Dispatch
-		/// </summary>
-		/// <param name="message">An ActiveMQMessage</param>
-		public void Dispatch(ActiveMQMessage message)
-		{
-			if(AcknowledgementMode.AutoAcknowledge == this.acknowledgementMode)
-			{
-				MessageAck ack = CreateMessageAck(message);
-				Tracer.Debug("Sending AutoAck: " + ack);
-				message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge);
-
-				lock(closedLock)
-				{
-					if(closed)
-					{
-						throw new ConnectionClosedException();
-					}
-
-					ackSession.Connection.Oneway(ack);
-				}
-			}
-
-			dispatcher.Enqueue(message);
-		}
-
-		/// <summary>
-		/// Dispatch any pending messages to the asynchronous listener
-		/// </summary>
-		internal void DispatchAsyncMessages()
-		{
-			while(listener != null)
-			{
-				IMessage message = dispatcher.DequeueNoWait();
-				if(message == null)
-				{
-					break;
-				}
-
-				message = SetupAcknowledge(message);
-				// invoke listener. Exceptions caught by the dispatcher thread
-				listener(message);
-			}
-		}
-
-		protected IMessage SetupAcknowledge(IMessage message)
-		{
-			if(null == message)
-			{
-				return null;
-			}
-
-			if(message is ActiveMQMessage)
-			{
-				ActiveMQMessage activeMessage = (ActiveMQMessage) message;
-
-				if(AcknowledgementMode.ClientAcknowledge == acknowledgementMode)
-				{
-					activeMessage.Acknowledger += new AcknowledgeHandler(DoClientAcknowledge);
-				}
-				else if(AcknowledgementMode.AutoAcknowledge != acknowledgementMode)
-				{
-					activeMessage.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge);
-					DoClientAcknowledge(activeMessage);
-				}
-			}
-
-			return message;
-		}
-
 		protected void SendPullRequest(long timeout)
 		{
-			if(this.info.PrefetchSize == 0 && this.dispatcher.isEmpty())
+			if(this.info.PrefetchSize == 0 && this.unconsumedMessages.Empty)
 			{
 				MessagePull messagePull = new MessagePull();
 				messagePull.ConsumerId = this.info.ConsumerId;
@@ -279,97 +310,702 @@
 			}
 		}
 
+        protected void DoIndividualAcknowledge(ActiveMQMessage message)
+        {
+            // TODO
+        }
+
 		protected void DoNothingAcknowledge(ActiveMQMessage message)
 		{
 		}
 
 		protected void DoClientAcknowledge(ActiveMQMessage message)
 		{
-			MessageAck ack = CreateMessageAck(message);
-			Tracer.Debug("Sending Ack: " + ack);
-			lock(closedLock)
-			{
-				if(closed)
-				{
-					throw new ConnectionClosedException();
-				}
-
-				session.Connection.Oneway(ack);
-			}
-		}
-
-		protected virtual MessageAck CreateMessageAck(Message message)
-		{
-			MessageAck ack = new MessageAck();
-			ack.AckType = (int) AckType.ConsumedAck;
-			ack.ConsumerId = info.ConsumerId;
-			ack.Destination = message.Destination;
-			ack.FirstMessageId = message.MessageId;
-			ack.LastMessageId = message.MessageId;
-			ack.MessageCount = 1;
-			ack.ResponseRequired = false;
-			
-			if(session.Transacted)
-			{
-				session.DoStartTransaction();
-				ack.TransactionId = session.TransactionContext.TransactionId;
-				session.TransactionContext.AddSynchronization(
-						new MessageConsumerSynchronization(this, message));
-			}
-			return ack;
-		}
-
-		public void AfterRollback(ActiveMQMessage message)
-		{
-			// lets redeliver the message again
-			message.RedeliveryCounter += 1;
-			if(message.RedeliveryCounter > MaximumRedeliveryCount)
-			{
-				// lets send back a poisoned pill
-				MessageAck ack = new MessageAck();
-				ack.AckType = (int) AckType.PoisonAck;
-				ack.ConsumerId = info.ConsumerId;
-				ack.Destination = message.Destination;
-				ack.FirstMessageId = message.MessageId;
-				ack.LastMessageId = message.MessageId;
-				ack.MessageCount = 1;
-				session.Connection.Oneway(ack);
-			}
-			else
-			{
-				dispatcher.Redeliver(message);
-			}
-		}
-	}
-
-
-	// TODO maybe there's a cleaner way of creating stateful delegates to make this code neater
-	internal class MessageConsumerSynchronization : ISynchronization
-	{
-		private readonly MessageConsumer consumer;
-		private readonly Message message;
-
-		public MessageConsumerSynchronization(MessageConsumer consumer, Message message)
-		{
-			this.message = message;
-			this.consumer = consumer;
-		}
-
-		#region ISynchronization Members
-
-		public void BeforeCommit()
-		{
-		}
-
-		public void AfterCommit()
-		{
-		}
+            this.CheckClosed();
+			Tracer.Debug("Sending Client Ack:");
+            this.Acknowledge();
+		}
+
+        public void Start()
+        {
+            if(this.unconsumedMessages.Closed)
+            {
+                return;
+            }
+
+            this.started.Value = true;
+            this.unconsumedMessages.Start();
+            this.session.Executor.Wakeup();
+        }
+    
+        public void Stop()
+        {
+            this.started.Value = false;
+            this.unconsumedMessages.Stop();
+        }
+
+        public void ClearMessagesInProgress()
+        {
+            // we are called from inside the transport reconnection logic
+            // which involves us clearing all the connections' consumers
+            // dispatch lists and clearing them
+            // so rather than trying to grab a mutex (which could be already
+            // owned by the message listener calling the send) we will just set
+            // a flag so that the list can be cleared as soon as the
+            // dispatch thread is ready to flush the dispatch list
+            this.clearDispatchList = true;
+        }
+
+        public void DeliverAcks()
+        {
+            MessageAck ack = null;
+            
+            if(this.deliveringAcks.CompareAndSet(false, true))
+            {
+                if(this.IsAutoAcknowledgeEach)
+                {
+                    lock(this.dispatchedMessages)
+                    {
+                        ack = MakeAckForAllDeliveredMessages(AckType.DeliveredAck);
+                        if(ack != null) 
+                        {
+                            this.dispatchedMessages.Clear();
+                        } 
+                        else
+                        {
+                            ack = this.pendingAck;
+                            this.pendingAck = null;
+                        }
+                    }
+                } 
+                else if(pendingAck != null && pendingAck.AckType == (byte)AckType.ConsumedAck) 
+                {
+                    ack = pendingAck;
+                    pendingAck = null;
+                }
+                
+                if(ack != null)
+                {
+                    MessageAck ackToSend = ack;
+                    
+                    try
+                    {
+                        this.session.Connection.Oneway(ackToSend);
+                    }
+                    catch(Exception e)
+                    {
+                        Tracer.DebugFormat("{0} : Failed to send ack, {1}", this.info.ConsumerId, e);
+                    }
+                } 
+                else
+                {
+                    this.deliveringAcks.Value = false;
+                }
+            }
+        }
+
+        public void Dispatch(MessageDispatch dispatch)
+        {
+            MessageListener listener = this.listener;
+            
+            try 
+            {
+                lock(this.unconsumedMessages.SyncRoot) 
+                {
+                    if(this.clearDispatchList)
+                    {
+                        // we are reconnecting so lets flush the in progress messages
+                        this.clearDispatchList = false;
+                        this.unconsumedMessages.Clear();
+                        
+                        if(this.pendingAck != null && this.pendingAck.AckType == (byte)AckType.DeliveredAck) 
+                        {
+                            // on resumption a pending delivered ack will be out of sync with
+                            // re-deliveries.
+                            Tracer.Debug("removing pending delivered ack on transport interupt: " + pendingAck);
+                            this.pendingAck = null;
+                        }
+                    }
+                    
+                    if(!this.unconsumedMessages.Closed) 
+                    {
+                        if(listener != null && this.unconsumedMessages.Running)
+                        {
+                            ActiveMQMessage message = CreateActiveMQMessage(dispatch);
+                            
+                            this.BeforeMessageIsConsumed(dispatch);
+                            
+                            try
+                            {
+                                bool expired = message.IsExpired();
+                                
+                                if(!expired)
+                                {
+                                    listener(message);
+                                }
+                                
+                                this.AfterMessageIsConsumed(dispatch, expired);
+                            } 
+                            catch(Exception e) 
+                            {
+                                if(IsAutoAcknowledgeBatch || IsAutoAcknowledgeEach || this.session.IsIndividualAcknowledge)
+                                {
+                                    // Redeliver the message
+                                } 
+                                else
+                                {
+                                    // Transacted or Client ack: Deliver the next message.
+                                    this.AfterMessageIsConsumed(dispatch, false);
+                                }
+
+                                Tracer.Error(this.info.ConsumerId + " Exception while processing message: " + e);
+                            }
+                        } 
+                        else 
+                        {
+                            this.unconsumedMessages.Enqueue(dispatch);
+                        }
+                    }
+                }
+                
+                if(++dispatchedCount % 1000 == 0) 
+                {
+                    dispatchedCount = 0;
+                    Thread.Sleep(1);
+                }
+            } 
+            catch(Exception e) 
+            {
+                this.session.Connection.OnSessionException(this.session, e);
+            }
+        }
+
+        public bool Iterate()
+        {
+            if(this.listener != null) 
+            {
+                MessageDispatch dispatch = this.unconsumedMessages.DequeueNoWait();
+                if(dispatch != null) 
+                {
+                    try 
+                    {
+                        ActiveMQMessage message = CreateActiveMQMessage(dispatch);
+                        BeforeMessageIsConsumed(dispatch);
+                        listener(message);
+                        AfterMessageIsConsumed(dispatch, false);
+                    } 
+                    catch(NMSException e) 
+                    {
+                        this.session.Connection.OnSessionException(this.session, e);
+                    }
+                    
+                    return true;
+                }
+            }
+            
+            return false;
+        }
+
+        /// <summary>
+        /// Used to get an enqueued message from the unconsumedMessages list. The
+        /// amount of time this method blocks is based on the timeout value.  if
+        /// timeout == Timeout.Infinite then it blocks until a message is received. 
+        /// if timeout == 0 then it it tries to not block at all, it returns a 
+        /// message if it is available if timeout > 0 then it blocks up to timeout 
+        /// amount of time.  Expired messages will consumed by this method.
+        /// </summary>
+        /// <param name="timeout">
+        /// A <see cref="System.Int64"/>
+        /// </param>
+        /// <returns>
+        /// A <see cref="MessageDispatch"/>
+        /// </returns>
+        private MessageDispatch Dequeue(TimeSpan timeout) 
+        {
+            DateTime deadline = DateTime.Now;
+            
+            if(timeout > TimeSpan.Zero)
+            {
+                deadline = DateTime.Now + timeout;
+            }
+            
+            while(true) 
+            {
+                MessageDispatch dispatch = this.unconsumedMessages.Dequeue(timeout);
+                
+                if(dispatch == null)
+                {
+                    if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed) 
+                    {
+                        timeout = deadline < DateTime.Now ? TimeSpan.Zero : deadline - DateTime.Now;
+                    }
+                    else
+                    {
+                        return null;
+                    }
+                } 
+                else if(dispatch.Message == null) 
+                {
+                    return null;
+                } 
+                else if(dispatch.Message.IsExpired())
+                {
+                    Tracer.DebugFormat("{0} received expired message: {1}", info.ConsumerId, dispatch);
+                    
+                    BeforeMessageIsConsumed(dispatch);
+                    AfterMessageIsConsumed(dispatch, true);
+                    
+                    if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed) 
+                    {
+                        timeout = deadline < DateTime.Now ? TimeSpan.Zero : deadline - DateTime.Now;
+                    }
+                } 
+                else 
+                {
+                    Tracer.DebugFormat("{0} received message: {1}", info.ConsumerId, dispatch);
+                    return dispatch;
+                }
+            }
+        }
+
+        public void BeforeMessageIsConsumed(MessageDispatch dispatch)
+        {
+            this.lastDeliveredSequenceId = dispatch.Message.MessageId.BrokerSequenceId;
+            
+            if(!IsAutoAcknowledgeBatch)
+            {
+                lock(this.dispatchedMessages) 
+                {
+                    this.dispatchedMessages.AddFirst(dispatch);
+                }
+                
+                if(this.session.IsTransacted) 
+                {
+                    this.AckLater(dispatch, AckType.DeliveredAck);
+                }
+            }            
+        }
+        
+        public void AfterMessageIsConsumed(MessageDispatch dispatch, bool expired)
+        {
+            if(this.unconsumedMessages.Closed) 
+            {
+                return;
+            }
+            
+            if(expired == true) 
+            {
+                lock(this.dispatchedMessages)
+                {
+                    this.dispatchedMessages.Remove(dispatch);
+                }
+                
+                AckLater(dispatch, AckType.DeliveredAck);
+            }
+            else
+            {
+                if(this.session.IsTransacted)
+                {
+                    // Do nothing.
+                }
+                else if(this.IsAutoAcknowledgeEach)
+                {
+                    if(this.deliveringAcks.CompareAndSet(false, true)) 
+                    {
+                        lock(this.dispatchedMessages) 
+                        {
+                            if(this.dispatchedMessages.Count != 0)
+                            {
+                                MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
+                                if(ack !=null) 
+                                {
+                                    this.dispatchedMessages.Clear();
+                                    this.session.Connection.Oneway(ack);
+                                }
+                            }
+                        }
+                        this.deliveringAcks.Value = false;
+                    }
+                } 
+                else if(this.IsAutoAcknowledgeBatch)
+                {
+                    AckLater(dispatch, AckType.ConsumedAck);
+                }
+                else if(this.session.IsClientAcknowledge || this.session.IsIndividualAcknowledge) 
+                {
+                    AckLater(dispatch, AckType.DeliveredAck);
+                } 
+                else 
+                {
+                    throw new NMSException("Invalid session state.");
+                }
+            }            
+        }
+
+        private MessageAck MakeAckForAllDeliveredMessages(AckType type)
+        {
+            lock(this.dispatchedMessages)
+            {
+                if(this.dispatchedMessages.Count == 0)
+                {
+                    return null;
+                }
+                    
+                MessageDispatch dispatch = this.dispatchedMessages.First.Value;
+                MessageAck ack = new MessageAck();
+                
+                ack.AckType = (byte)type;
+                ack.ConsumerId = this.info.ConsumerId;
+                ack.Destination = dispatch.Destination;
+                ack.LastMessageId = dispatch.Message.MessageId;
+                ack.MessageCount = this.dispatchedMessages.Count;
+                ack.FirstMessageId = this.dispatchedMessages.Last.Value.Message.MessageId;
+                
+                return ack;
+            }
+        }
+
+        private void AckLater(MessageDispatch dispatch, AckType type)
+        {
+            // Don't acknowledge now, but we may need to let the broker know the
+            // consumer got the message to expand the pre-fetch window
+            if(this.session.IsTransacted)
+            {
+                this.session.DoStartTransaction();
+                
+                if(!synchronizationRegistered) 
+                {
+                    this.synchronizationRegistered = true;
+                    this.session.TransactionContext.AddSynchronization(new MessageConsumerSynchronization(this));
+                }
+            }
+
+            this.deliveredCounter++;
+            
+            MessageAck oldPendingAck = pendingAck;
+            
+            pendingAck = new MessageAck();
+            pendingAck.AckType = (byte)type;
+            pendingAck.ConsumerId = this.info.ConsumerId;
+            pendingAck.Destination = dispatch.Destination;
+            pendingAck.LastMessageId = dispatch.Message.MessageId;
+            pendingAck.MessageCount = deliveredCounter;
+
+            if(this.session.IsTransacted && this.session.TransactionContext.InTransaction)
+            {
+                pendingAck.TransactionId = this.session.TransactionContext.TransactionId;
+            }
+            
+            if(oldPendingAck == null) 
+            {
+                pendingAck.FirstMessageId = pendingAck.LastMessageId;
+            } 
+            else if(oldPendingAck.AckType == pendingAck.AckType)
+            {
+                pendingAck.FirstMessageId = oldPendingAck.FirstMessageId;
+            } 
+            else 
+            {
+                Tracer.Debug("AckLater: Old Ack was not the same Ack type.");
+
+                // old pending ack being superseded by ack of another type, if is is not a delivered
+                // ack and hence important, send it now so it is not lost.
+                if(oldPendingAck.AckType != (byte)AckType.DeliveredAck) 
+                {
+                    Tracer.Debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
+                    this.session.Connection.Oneway(oldPendingAck);
+                } 
+                else
+                {
+                    Tracer.Debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
+                }
+            }
+            
+            if((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter - this.additionalWindowSize)) 
+            {
+                this.session.Connection.Oneway(pendingAck);
+                this.pendingAck = null;
+                this.deliveredCounter = 0;
+                this.additionalWindowSize = 0;
+            }
+        }
+
+        private void Acknowledge()
+        {
+            lock(this.dispatchedMessages)
+            {
+                // Acknowledge all messages so far.
+                MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
+                
+                if(ack == null)
+                {
+                    return; // no msgs
+                }
+                
+                if(this.session.IsTransacted)
+                {
+                    this.session.DoStartTransaction();
+                    ack.TransactionId = this.session.TransactionContext.TransactionId;
+                }
+                
+                this.session.Connection.Oneway(ack);
+                this.pendingAck = null;
+                
+                // Adjust the counters
+                this.deliveredCounter = Math.Max(0, this.deliveredCounter - this.dispatchedMessages.Count);
+                this.additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.dispatchedMessages.Count);
+                
+                if(!this.session.IsTransacted) 
+                {
+                    this.dispatchedMessages.Clear();
+                } 
+            }            
+        }        
+
+        private void Acknowledge(MessageDispatch dispatch)
+        {
+            MessageAck ack = new MessageAck();
+
+            ack.AckType = (byte)AckType.IndividualAck;
+            ack.ConsumerId = this.info.ConsumerId;
+            ack.Destination = dispatch.Destination;
+            ack.LastMessageId = dispatch.Message.MessageId;
+            ack.MessageCount = 1;            
+
+            this.session.Connection.Oneway(ack);
+            lock(this.dispatchedMessages)
+            {
+                this.dispatchedMessages.Remove(dispatch);
+            }
+        }
+        
+        private void Commit()
+        {
+            lock(this.dispatchedMessages)
+            {
+                this.dispatchedMessages.Clear();
+            }
+            
+            this.redeliveryDelay = 0;
+        }
+
+        private void Rollback()
+        {
+            lock(this.unconsumedMessages.SyncRoot)
+            {
+                lock(this.dispatchedMessages)
+                {
+                    if(this.dispatchedMessages.Count == 0)
+                    {
+                        return;
+                    }
+        
+                    // Only increase the redelivery delay after the first redelivery..
+                    MessageDispatch lastMd = this.dispatchedMessages.First.Value;
+                    int currentRedeliveryCount = lastMd.Message.RedeliveryCounter;
+                    
+                    if(currentRedeliveryCount > 0) 
+                    {
+                        redeliveryDelay = 1000;
+                        //redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
+                    }
+                    
+                    MessageId firstMsgId = this.dispatchedMessages.Last.Value.Message.MessageId;
+        
+                    //if(redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
+                    //    && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) {
+                    if(lastMd.Message.RedeliveryCounter > MessageConsumer.DEFAULT_MAX_REDELIVERIES)
+                    {
+                        // We need to NACK the messages so that they get sent to the
+                        // DLQ.
+                        // Acknowledge the last message.
+                        
+                        MessageAck ack = new MessageAck();
+
+                        ack.AckType = (byte)AckType.PoisonAck;
+                        ack.ConsumerId = this.info.ConsumerId;
+                        ack.Destination = lastMd.Destination;
+                        ack.LastMessageId = lastMd.Message.MessageId;
+                        ack.MessageCount = this.dispatchedMessages.Count;                                    
+                        ack.FirstMessageId = firstMsgId;
+
+                        this.session.Connection.Oneway(ack);
+                        
+                        // Adjust the window size.
+                        additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.dispatchedMessages.Count);
+                        
+                        //redeliveryDelay = 0;                        
+                    } 
+                    else
+                    {                        
+                        // only redelivery_ack after first delivery
+                        if(currentRedeliveryCount > 0)
+                        {
+                            MessageAck ack = new MessageAck();
+                            
+                            ack.AckType = (byte)AckType.RedeliveredAck;
+                            ack.ConsumerId = this.info.ConsumerId;
+                            ack.Destination = lastMd.Destination;
+                            ack.LastMessageId = lastMd.Message.MessageId;
+                            ack.MessageCount = this.dispatchedMessages.Count;                                    
+                            ack.FirstMessageId = firstMsgId;
+                            
+                            this.session.Connection.Oneway(ack);
+                        }
+        
+                        // stop the delivery of messages.
+                        this.unconsumedMessages.Stop();
+
+                        foreach(MessageDispatch dispatch in this.dispatchedMessages)
+                        {
+                            this.unconsumedMessages.EnqueueFirst(dispatch);
+                        }
+        
+                        if(redeliveryDelay > 0 && !this.unconsumedMessages.Closed)
+                        {
+                            DateTime deadline = DateTime.Now.AddMilliseconds(redeliveryDelay);
+                            ThreadPool.QueueUserWorkItem(this.RollbackHelper, deadline);
+                        } else {
+                            Start();
+                        }
+                    }
+                    
+                    this.deliveredCounter -= this.dispatchedMessages.Count;
+                    this.dispatchedMessages.Clear();
+                }
+            }
+
+            // Only redispatch if there's an async listener otherwise a synchronous
+            // consumer will pull them from the local queue.
+            if(this.listener != null) 
+            {
+                this.session.Redispatch(this.unconsumedMessages);
+            }
+        }
+        
+        private void RollbackHelper(Object arg)
+        {
+            try
+            {
+                TimeSpan waitTime = (DateTime) arg - DateTime.Now;
+
+                if(waitTime.CompareTo(TimeSpan.Zero) > 0)
+                {
+                    Thread.Sleep(waitTime);
+                }
+                
+                this.Start();
+            }            
+            catch(Exception e)
+            {
+                this.session.Connection.OnSessionException(this.session, e);
+            }
+        }
+
+        private ActiveMQMessage CreateActiveMQMessage(MessageDispatch dispatch) 
+        {
+            ActiveMQMessage message = dispatch.Message.Clone() as ActiveMQMessage;
+            
+            if(this.session.IsClientAcknowledge)
+            {
+                message.Acknowledger += new AcknowledgeHandler(DoClientAcknowledge);
+            }
+            else if(this.session.IsIndividualAcknowledge)
+            {
+                message.Acknowledger += new AcknowledgeHandler(DoIndividualAcknowledge);
+            }
+            else
+            {
+                message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge);                
+            }
+            
+            return message;
+        }
+        
+        private void CheckClosed()
+        {
+            if(this.unconsumedMessages.Closed)
+            {
+                throw new NMSException("The Consumer has been Closed");
+            }
+        }
+
+        private void CheckMessageListener()
+        {
+            if(this.listener != null)
+            {
+                throw new NMSException("Cannot set Async listeners on Consumers with a prefetch limit of zero");
+            }
+        }
+
+        private bool IsAutoAcknowledgeEach
+        {
+            get 
+            { 
+                return this.session.IsAutoAcknowledge ||
+                       (this.session.IsDupsOkAcknowledge && this.info.Destination.IsQueue); 
+            }
+        }
+    
+        private bool IsAutoAcknowledgeBatch 
+        {
+            get { return this.session.IsDupsOkAcknowledge && !this.info.Destination.IsQueue; }
+        }
+
+        #region Nested ISyncronization Types
+        
+        class MessageConsumerSynchronization : ISynchronization
+        {
+            private readonly MessageConsumer consumer;
+    
+            public MessageConsumerSynchronization(MessageConsumer consumer)
+            {
+                this.consumer = consumer;
+            }
+    
+            public void BeforeEnd()
+            {
+                this.consumer.Acknowledge();
+                this.consumer.synchronizationRegistered = false;
+            }
+    
+            public void AfterCommit()
+            {
+                this.consumer.Commit();
+                this.consumer.synchronizationRegistered = false;
+            }
+    
+            public void AfterRollback()
+            {
+                this.consumer.Rollback();
+                this.consumer.synchronizationRegistered = false;
+            }
+        }
+    
+        class ConsumerCloseSynchronization : ISynchronization
+        {
+            private readonly MessageConsumer consumer;
+    
+            public ConsumerCloseSynchronization(MessageConsumer consumer)
+            {
+                this.consumer = consumer;
+            }
+    
+            public void BeforeEnd()
+            {
+            }
+    
+            public void AfterCommit()
+            {
+                this.consumer.DoClose();
+            }
+    
+            public void AfterRollback()
+            {
+                this.consumer.DoClose();
+            }
+        }
 
-		public void AfterRollback()
-		{
-			consumer.AfterRollback((ActiveMQMessage) message);
-		}
-
-		#endregion
-	}
+        #endregion
+    }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs?rev=829386&r1=829385&r2=829386&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs Sat Oct 24 15:23:52 2009
@@ -101,6 +101,23 @@
                     return;
                 }
 
+                DoClose();
+                RemoveInfo removeInfo = new RemoveInfo();
+                removeInfo.ObjectId = this.info.ProducerId;
+                this.session.Connection.Oneway(removeInfo);
+                this.session = null;
+            }
+        }
+
+        internal void DoClose()
+        {
+            lock(closedLock)
+            {
+                if(closed)
+                {
+                    return;
+                }
+
                 try
                 {
                     session.DisposeOf(info.ProducerId);
@@ -115,9 +132,8 @@
                     this.usage.Stop();
                 }
 
-                session = null;
                 closed = true;
-            }
+            }            
         }
 
         public void Send(IMessage message)

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=829386&r1=829385&r2=829386&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Sat Oct 24 15:23:52 2009
@@ -27,36 +27,52 @@
     /// <summary>
     /// Default provider of ISession
     /// </summary>
-    public class Session : ISession
+    public class Session : ISession, IDispatcher
     {
         /// <summary>
         /// Private object used for synchronization, instead of public "this"
         /// </summary>
         private readonly object myLock = new object();
-        private int consumerCounter;
+        
         private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable());
         private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
-        private readonly DispatchingThread dispatchingThread;
-        private DispatchingThread.ExceptionHandler dispatchingThread_ExceptionHandler;
+        
+        private SessionExecutor executor;
+        private TransactionContext transactionContext;
+        private Connection connection;
+
+        private int prefetchSize;
+        private int maximumPendingMessageLimit;
+        private bool dispatchAsync;
+        private bool exclusive;
+        private bool retroactive;
+        private byte priority;
+        
         private readonly SessionInfo info;
+        private int consumerCounter;
         private int producerCounter;
-        internal bool startedAsyncDelivery = false;
+        private long nextDeliveryId;
+        private long lastDeliveredSequenceId;
         private bool disposed = false;
         private bool closed = false;
         private bool closing = false;
-        private TimeSpan MAX_THREAD_WAIT = TimeSpan.FromMilliseconds(30000);
+        private TimeSpan requestTimeout = Apache.NMS.NMSConstants.defaultRequestTimeout;
+        private AcknowledgementMode acknowledgementMode;
 
         public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode)
         {
             this.connection = connection;
             this.info = info;
             this.acknowledgementMode = acknowledgementMode;
-            this.AsyncSend = connection.AsyncSend;
             this.requestTimeout = connection.RequestTimeout;
             this.PrefetchSize = 1000;
-            this.transactionContext = new TransactionContext(this);
-            this.dispatchingThread = new DispatchingThread(new DispatchingThread.DispatchFunction(DispatchAsyncMessages));
-            this.dispatchingThread_ExceptionHandler = new DispatchingThread.ExceptionHandler(dispatchingThread_ExceptionListener);
+
+            if(acknowledgementMode == AcknowledgementMode.Transactional)
+            {
+                this.transactionContext = new TransactionContext(this);
+            }
+
+            this.executor = new SessionExecutor(this, this.consumers);
         }
 
         ~Session()
@@ -64,11 +80,17 @@
             Dispose(false);
         }
 
+        #region Property Accessors
+
         /// <summary>
         /// Sets the prefetch size, the maximum number of messages a broker will dispatch to consumers
         /// until acknowledgements are received.
         /// </summary>
-        public int PrefetchSize;
+        public int PrefetchSize
+        {
+            get{ return this.prefetchSize; }
+            set{ this.prefetchSize = value; }
+        }
 
         /// <summary>
         /// Sets the maximum number of messages to keep around per consumer
@@ -76,35 +98,49 @@
         /// will start to be evicted for slow consumers.
         /// Must be > 0 to enable this feature
         /// </summary>
-        public int MaximumPendingMessageLimit;
+        public int MaximumPendingMessageLimit
+        {
+            get{ return this.maximumPendingMessageLimit; }
+            set{ this.maximumPendingMessageLimit = value; }
+        }
 
         /// <summary>
         /// Enables or disables whether asynchronous dispatch should be used by the broker
         /// </summary>
-        public bool DispatchAsync;
+        public bool DispatchAsync
+        {
+            get{ return this.dispatchAsync; }
+            set{ this.dispatchAsync = value; }
+        }
 
         /// <summary>
         /// Enables or disables exclusive consumers when using queues. An exclusive consumer means
         /// only one instance of a consumer is allowed to process messages on a queue to preserve order
         /// </summary>
-        public bool Exclusive;
+        public bool Exclusive
+        {
+            get{ return this.exclusive; }
+            set{ this.exclusive = value; }
+        }
 
         /// <summary>
         /// Enables or disables retroactive mode for consumers; i.e. do they go back in time or not?
         /// </summary>
-        public bool Retroactive;
+        public bool Retroactive
+        {
+            get{ return this.retroactive; }
+            set{ this.retroactive = value; }
+        }
 
         /// <summary>
         /// Sets the default consumer priority for consumers
         /// </summary>
-        public byte Priority;
-
-        /// <summary>
-        /// This property indicates whether or not async send is enabled.
-        /// </summary>
-        public bool AsyncSend;
+        public byte Priority
+        {
+            get{ return this.priority; }
+            set{ this.priority = value; }
+        }
 
-        private Connection connection;
         public Connection Connection
         {
             get { return this.connection; }
@@ -115,12 +151,64 @@
             get { return info.SessionId; }
         }
 
-        private TransactionContext transactionContext;
         public TransactionContext TransactionContext
         {
             get { return this.transactionContext; }
         }
 
+        public TimeSpan RequestTimeout
+        {
+            get { return this.requestTimeout; }
+            set { this.requestTimeout = value; }
+        }
+
+        public bool Transacted
+        {
+            get { return this.AcknowledgementMode == AcknowledgementMode.Transactional; }
+        }
+
+        public AcknowledgementMode AcknowledgementMode
+        {
+            get { return this.acknowledgementMode; }
+        }
+
+        public bool IsClientAcknowledge
+        {
+            get { return this.acknowledgementMode == AcknowledgementMode.ClientAcknowledge; }
+        }
+
+        public bool IsAutoAcknowledge
+        {
+            get { return this.acknowledgementMode == AcknowledgementMode.AutoAcknowledge; }
+        }
+
+        public bool IsDupsOkAcknowledge
+        {
+            get { return this.acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge; }
+        }
+
+        public bool IsIndividualAcknowledge
+        {
+            get { return false; }
+        }
+
+        public bool IsTransacted
+        {
+            get { return this.acknowledgementMode == AcknowledgementMode.Transactional; }
+        }
+
+        public SessionExecutor Executor
+        {
+            get { return this.executor; }
+        }
+
+        public long NextDeliveryId
+        {
+            get { return Interlocked.Increment(ref this.nextDeliveryId); }
+        }
+        
+        #endregion
+        
         #region ISession Members
 
         public void Dispose()
@@ -164,13 +252,49 @@
 
                 try
                 {
+                    DoClose();
+                }
+                catch(Exception ex)
+                {
+                    Tracer.ErrorFormat("Error during session close: {0}", ex);
+                }
+                finally
+                {
+                    // Make sure we attempt to inform the broker this Session is done.
+                    RemoveInfo info = new RemoveInfo();
+                    info.ObjectId = this.info.SessionId;
+                    info.LastDeliveredSequenceId = this.lastDeliveredSequenceId;
+                    this.connection.Oneway(info);
+                    this.connection = null;
+                    this.closed = true;
+                    this.closing = false;
+                }
+            }
+        }
+
+        internal void DoClose()
+        {
+            lock(myLock)
+            {
+                if(this.closed)
+                {
+                    return;
+                }
+
+                try
+                {
                     this.closing = true;
-                    StopAsyncDelivery();
+
+                    // Stop all message deliveries from this Session
+                    Stop();
+                    
                     lock(consumers.SyncRoot)
                     {
                         foreach(MessageConsumer consumer in consumers.Values)
                         {
-                            consumer.Close();
+                            consumer.DoClose();
+                            this.lastDeliveredSequenceId =
+                                Math.Min(this.lastDeliveredSequenceId, consumer.LastDeliveredSequenceId);
                         }
                     }
                     consumers.Clear();
@@ -179,10 +303,23 @@
                     {
                         foreach(MessageProducer producer in producers.Values)
                         {
-                            producer.Close();
+                            producer.DoClose();
                         }
                     }
                     producers.Clear();
+
+                    // If in a transaction roll it back
+                    if(this.IsTransacted && this.transactionContext.InTransaction)
+                    {
+                        try
+                        {
+                            this.transactionContext.Rollback();
+                        }
+                        catch
+                        {
+                        }
+                    }                    
+                    
                     Connection.RemoveSession(this);
                 }
                 catch(Exception ex)
@@ -191,13 +328,12 @@
                 }
                 finally
                 {
-                    this.connection = null;
                     this.closed = true;
                     this.closing = false;
                 }
-            }
+            }            
         }
-
+        
         public IMessageProducer CreateProducer()
         {
             return CreateProducer(null);
@@ -213,7 +349,7 @@
             {
                 producer = new MessageProducer(this, command);
                 producers[producerId] = producer;
-                this.DoSend(command);
+                this.connection.Oneway(command);
             }
             catch(Exception)
             {
@@ -253,12 +389,21 @@
             ConsumerId consumerId = command.ConsumerId;
             MessageConsumer consumer = null;
 
+            // Registered with Connection before we register at the broker.
+            connection.addDispatcher(consumerId, this);
+
             try
             {
-                consumer = new MessageConsumer(this, command, this.AcknowledgementMode);
+                consumer = new MessageConsumer(this, command);
                 // lets register the consumer first in case we start dispatching messages immediately
                 consumers[consumerId] = consumer;
-                this.DoSend(command);
+                this.Connection.SyncRequest(command);
+
+                if(this.Started)
+                {
+                    consumer.Start();
+                }
+                
                 return consumer;
             }
             catch(Exception)
@@ -285,12 +430,21 @@
             command.NoLocal = noLocal;
             MessageConsumer consumer = null;
 
+            // Registered with Connection before we register at the broker.
+            connection.addDispatcher(consumerId, this);
+            
             try
             {
-                consumer = new MessageConsumer(this, command, this.AcknowledgementMode);
+                consumer = new MessageConsumer(this, command);
                 // lets register the consumer first in case we start dispatching messages immediately
                 consumers[consumerId] = consumer;
-                this.DoSend(command);
+
+                if(this.Started)
+                {
+                    consumer.Start();
+                }
+                
+                this.connection.SyncRequest(command);
             }
             catch(Exception)
             {
@@ -311,7 +465,7 @@
             command.ConnectionId = Connection.ConnectionId;
             command.ClientId = Connection.ClientId;
             command.SubcriptionName = name;
-            this.DoSend(command);
+            this.connection.SyncRequest(command);
         }
 
         public IQueueBrowser CreateBrowser(IQueue queue)
@@ -358,27 +512,24 @@
             command.OperationType = DestinationInfo.REMOVE_OPERATION_TYPE; // 1 is remove
             command.Destination = (ActiveMQDestination) destination;
 
-            this.DoSend(command);
+            this.connection.Oneway(command);
         }
 
         public IMessage CreateMessage()
         {
             ActiveMQMessage answer = new ActiveMQMessage();
-            Configure(answer);
             return answer;
         }
 
         public ITextMessage CreateTextMessage()
         {
             ActiveMQTextMessage answer = new ActiveMQTextMessage();
-            Configure(answer);
             return answer;
         }
 
         public ITextMessage CreateTextMessage(string text)
         {
             ActiveMQTextMessage answer = new ActiveMQTextMessage(text);
-            Configure(answer);
             return answer;
         }
 
@@ -419,6 +570,7 @@
                         "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: "
                         + this.AcknowledgementMode);
             }
+            
             this.TransactionContext.Commit();
         }
 
@@ -430,55 +582,12 @@
                         "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: "
                         + this.AcknowledgementMode);
             }
+            
             this.TransactionContext.Rollback();
-
-            // lets ensure all the consumers redeliver any rolled back messages
-            lock(consumers.SyncRoot)
-            {
-                foreach(MessageConsumer consumer in consumers.Values)
-                {
-                    consumer.RedeliverRolledBackMessages();
-                }
-            }
-        }
-
-
-        // Properties
-
-        private TimeSpan requestTimeout = Apache.NMS.NMSConstants.defaultRequestTimeout;
-        public TimeSpan RequestTimeout
-        {
-            get { return this.requestTimeout; }
-            set { this.requestTimeout = value; }
-        }
-
-        public bool Transacted
-        {
-            get { return this.AcknowledgementMode == AcknowledgementMode.Transactional; }
-        }
-
-        private AcknowledgementMode acknowledgementMode;
-        public AcknowledgementMode AcknowledgementMode
-        {
-            get { return this.acknowledgementMode; }
         }
 
         #endregion
 
-        private void dispatchingThread_ExceptionListener(Exception exception)
-        {
-            if(null != Connection)
-            {
-                try
-                {
-                    Connection.OnSessionException(this, exception);
-                }
-                catch
-                {
-                }
-            }
-        }
-
         protected void CreateTemporaryDestination(ActiveMQDestination tempDestination)
         {
             DestinationInfo command = new DestinationInfo();
@@ -486,25 +595,7 @@
             command.OperationType = DestinationInfo.ADD_OPERATION_TYPE; // 0 is add
             command.Destination = tempDestination;
 
-            this.DoSend(command);
-        }
-
-        private void DoSend(Command message)
-        {
-            this.DoSend(message, this.RequestTimeout);
-        }
-
-        private void DoSend(Command message, TimeSpan requestTimeout)
-        {
-            if(AsyncSend)
-            {
-                message.ResponseRequired = false;
-                Connection.Oneway(message);
-            }
-            else
-            {
-                Connection.SyncRequest(message, requestTimeout);
-            }
+            this.connection.SyncRequest(command);
         }
 
         public void DoSend( ActiveMQMessage message, MessageProducer producer, MemoryUsage producerWindow, TimeSpan sendTimeout )
@@ -566,9 +657,11 @@
             }
         }
 
-        public void DisposeOf(ConsumerId objectId)
+        public void DisposeOf(ConsumerId objectId, long lastDeliveredSequenceId)
         {
-            Connection.DisposeOf(objectId);
+            connection.removeDispatcher(objectId);
+            this.lastDeliveredSequenceId = Math.Min(this.lastDeliveredSequenceId, lastDeliveredSequenceId);
+            
             if(!this.closing)
             {
                 consumers.Remove(objectId);
@@ -577,7 +670,6 @@
 
         public void DisposeOf(ProducerId objectId)
         {
-            Connection.DisposeOf(objectId);
             connection.removeProducer(objectId);
             if(!this.closing)
             {
@@ -585,37 +677,6 @@
             }
         }
 
-        public bool DispatchMessage(ConsumerId consumerId, Message message)
-        {
-            bool dispatched = false;
-            MessageConsumer consumer = (MessageConsumer) consumers[consumerId];
-
-            if(consumer != null)
-            {
-                consumer.Dispatch((ActiveMQMessage) message);
-                dispatched = true;
-            }
-
-            return dispatched;
-        }
-
-        /// <summary>
-        /// Private method called by the dispatcher thread in order to perform
-        /// asynchronous delivery of queued (inbound) messages.
-        /// </summary>
-        private void DispatchAsyncMessages()
-        {
-            // lets iterate through each consumer created by this session
-            // ensuring that they have all pending messages dispatched
-            lock(consumers.SyncRoot)
-            {
-                foreach(MessageConsumer consumer in consumers.Values)
-                {
-                    consumer.DispatchAsyncMessages();
-                }
-            }
-        }
-
         protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector)
         {
             ConsumerInfo answer = new ConsumerInfo();
@@ -666,36 +727,79 @@
             return answer;
         }
 
-        /// <summary>
-        /// Configures the message command
-        /// </summary>
-        protected void Configure(ActiveMQMessage message)
+        public void Stop()
         {
+            if(this.executor != null)
+            {
+                this.executor.Stop();
+            }
         }
 
-        internal void StopAsyncDelivery()
+        public void Start()
         {
-            if(startedAsyncDelivery)
+            foreach(MessageConsumer consumer in this.consumers.Values)
             {
-                this.dispatchingThread.ExceptionListener -= this.dispatchingThread_ExceptionHandler;
-                dispatchingThread.Stop((int) MAX_THREAD_WAIT.TotalMilliseconds);
-                startedAsyncDelivery = false;
+                consumer.Start();
             }
+            
+            if(this.executor != null)
+            {
+                this.executor.Start();
+            }            
         }
 
-        internal void StartAsyncDelivery()
+        public bool Started
         {
-            if(!startedAsyncDelivery)
+            get
             {
-                this.dispatchingThread.ExceptionListener += this.dispatchingThread_ExceptionHandler;
-                dispatchingThread.Start();
-                startedAsyncDelivery = true;
+                return this.executor != null ? this.executor.Running : false;
             }
         }
 
-        internal void RegisterConsumerDispatcher(Dispatcher dispatcher)
+        public void Redispatch(MessageDispatchChannel channel)
         {
-            dispatcher.SetAsyncDelivery(this.dispatchingThread.EventHandle);
+            MessageDispatch[] messages = channel.RemoveAll();
+            System.Array.Reverse(messages);
+
+            foreach(MessageDispatch message in messages)
+            {
+                this.executor.ExecuteFirst(message);
+            }
         }
+        
+        public void Dispatch(MessageDispatch dispatch)
+        {
+            if(this.executor != null)
+            {
+                this.executor.Execute(dispatch);
+            }
+        }
+
+        public void ClearMessagesInProgress() 
+        {        
+            if( this.executor != null ) {
+                this.executor.ClearMessagesInProgress();
+            }
+        
+            lock(this.consumers.SyncRoot)
+            {
+                foreach(MessageConsumer consumer in this.consumers)
+                {
+                    consumer.ClearMessagesInProgress();
+                }
+            }
+        }
+        
+        public void DeliverAcks() 
+        {        
+            lock(this.consumers.SyncRoot)
+            {
+                foreach(MessageConsumer consumer in this.consumers)
+                {
+                    consumer.DeliverAcks();
+                }
+            }
+        }
+        
     }
 }

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs?rev=829386&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs Sat Oct 24 15:23:52 2009
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.Util;
+using Apache.NMS.ActiveMQ.Threads;
+
+namespace Apache.NMS.ActiveMQ
+{
+    public class SessionExecutor : Threads.Task
+    {
+        private MessageDispatchChannel messageQueue = new MessageDispatchChannel();
+        private TaskRunner taskRunner = null;
+
+        private Session session = null;
+        private IDictionary consumers = null;
+
+        public SessionExecutor(Session session, IDictionary consumers)
+        {
+            this.session = session;
+            this.consumers = consumers;
+        }
+
+        ~SessionExecutor()
+        {
+            try
+            {
+                Stop();
+                Close();
+                Clear();
+            }
+            catch
+            {
+            }
+        }
+
+        public void Execute(MessageDispatch dispatch)
+        {
+            // Add the data to the queue.
+            this.messageQueue.Enqueue(dispatch);
+            this.Wakeup();
+        }
+
+        public void ExecuteFirst(MessageDispatch dispatch)
+        {
+            // Add the data to the queue.
+            this.messageQueue.EnqueueFirst(dispatch);
+            this.Wakeup();
+        }
+
+        public void Wakeup()
+        {
+            TaskRunner taskRunner = this.taskRunner;
+
+            lock(messageQueue.SyncRoot)
+            {
+                if(this.taskRunner == null)
+                {
+                    this.taskRunner = new DedicatedTaskRunner(this);
+                }
+
+                taskRunner = this.taskRunner;
+            }
+
+            taskRunner.Wakeup();
+        }
+
+        public void Start()
+        {
+            if(!messageQueue.Running)
+            {
+                messageQueue.Start();
+
+                if(HasUncomsumedMessages)
+                {
+                    this.Wakeup();
+                }
+            }
+        }
+
+        public void Stop()
+        {
+            if(messageQueue.Running)
+            {
+                messageQueue.Stop();
+                TaskRunner taskRunner = this.taskRunner;
+
+                if(taskRunner != null)
+                {
+                    this.taskRunner = null;
+                    taskRunner.Shutdown();
+                }
+            }
+        }
+
+        public void Close()
+        {
+            this.messageQueue.Close();
+        }
+
+        public void Dispatch(MessageDispatch dispatch)
+        {
+            try
+            {
+                MessageConsumer consumer = null;
+
+                lock(this.consumers.SyncRoot)
+                {
+                    if(this.consumers.Contains(dispatch.ConsumerId))
+                    {
+                        consumer = this.consumers[dispatch.ConsumerId] as MessageConsumer;
+                    }
+
+                    // If the consumer is not available, just ignore the message.
+                    // Otherwise, dispatch the message to the consumer.
+                    if(consumer != null)
+                    {
+                        consumer.Dispatch(dispatch);
+                    }
+                }
+
+            }
+            catch(Exception ex)
+            {
+                Tracer.DebugFormat("Caught Exception While Dispatching: {0}", ex.Message );
+            }
+        }
+
+        public bool Iterate()
+        {
+            try
+            {
+                lock(this.consumers.SyncRoot)
+                {
+                    // Deliver any messages queued on the consumer to their listeners.
+                    foreach( MessageConsumer consumer in this.consumers.Values )
+                    {
+                        if(consumer.Iterate())
+                        {
+                            return true;
+                        }
+                    }
+                }
+
+                // No messages left queued on the listeners.. so now dispatch messages
+                // queued on the session
+                MessageDispatch message = messageQueue.DequeueNoWait();
+
+                if(message != null)
+                {
+                    this.Dispatch(message);
+                    return !messageQueue.Empty;
+                }
+
+                return false;
+            }
+            catch(Exception ex)
+            {
+                Tracer.DebugFormat("Caught Exception While Dispatching: {0}", ex.Message );
+                this.session.Connection.OnSessionException(this.session, ex);
+            }
+
+            return true;
+        }
+
+        public void ClearMessagesInProgress()
+        {
+            this.messageQueue.Clear();
+        }
+
+        public void Clear()
+        {
+            this.messageQueue.Clear();
+        }
+
+        public MessageDispatch[] UnconsumedMessages
+        {
+            get{ return messageQueue.RemoveAll(); }
+        }
+
+        public bool HasUncomsumedMessages
+        {
+            get{ return !messageQueue.Closed && messageQueue.Running && !messageQueue.Empty; }
+        }
+
+        public bool Running
+        {
+            get{ return this.messageQueue.Running; }
+        }
+
+        public bool Empty
+        {
+            get{ return this.messageQueue.Empty; }
+        }
+
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs?rev=829386&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs Sat Oct 24 15:23:52 2009
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+
+namespace Apache.NMS.ActiveMQ.Threads
+{
+    /// <summary>
+    /// A TaskRunner that dedicates a single thread to running a single Task.
+    /// </summary>
+    public class DedicatedTaskRunner : TaskRunner
+    {
+        private readonly Mutex mutex = new Mutex();
+        private Thread theThread = null;
+        private Task task = null;
+
+        private bool terminated = false;
+        private bool pending = false;
+        private bool shutdown = false;
+        
+        public DedicatedTaskRunner(Task task)
+        {
+            if(task == null)
+            {
+                throw new NullReferenceException("Task was null");
+            }
+            
+            this.task = task;
+
+            this.theThread = new Thread(Run);
+            this.theThread.Start();
+        }
+
+        ~DedicatedTaskRunner()
+        {
+            this.Shutdown();
+        }
+
+        public void Shutdown(TimeSpan timeout)
+        {
+            lock(mutex) 
+            {
+                this.shutdown = true;
+                this.pending = true;
+                
+                Monitor.PulseAll(this.mutex);
+
+                // Wait till the thread stops ( no need to wait if shutdown
+                // is called from thread that is shutting down)
+                if(!this.terminated) 
+                {
+                    Monitor.Wait(this.mutex, timeout);
+                }
+            }            
+        }
+
+        public void Shutdown()
+        {
+            this.Shutdown(new TimeSpan(Timeout.Infinite));
+        }
+
+        public void Wakeup()
+        {
+            lock(mutex)
+            {
+                if(this.shutdown)
+                {
+                    return;
+                }
+                
+                this.pending = true;
+                
+                Monitor.PulseAll(this.mutex);
+            }            
+        }
+
+        internal void Run()
+        {
+            try 
+            {
+                while(true) 
+                {
+                    lock(this.mutex) 
+                    {
+                        pending = false;
+                        
+                        if(this.shutdown)
+                        {
+                            return;
+                        }
+                    }
+
+                    if(!this.task.Iterate())
+                    {
+                        // wait to be notified.
+                        lock(this.mutex)
+                        {
+                            if(this.shutdown) 
+                            {
+                                return;
+                            }
+                            
+                            while(!this.pending) 
+                            {
+                                Monitor.Wait(this.mutex);
+                            }
+                        }
+                    }
+                }
+            }
+            catch
+            {
+            }
+        
+            // Make sure we notify any waiting threads that thread
+            // has terminated.
+            lock(this.mutex)
+            {
+                this.terminated = true;
+                Monitor.PulseAll(this.mutex);
+            }
+        }
+    }
+}



Mime
View raw message