activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jgo...@apache.org
Subject svn commit: r882297 - /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Date Thu, 19 Nov 2009 21:11:57 GMT
Author: jgomes
Date: Thu Nov 19 21:11:57 2009
New Revision: 882297

URL: http://svn.apache.org/viewvc?rev=882297&view=rev
Log:
Fixed timeout calculations in Dequeue().

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=882297&r1=882296&r2=882297&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Thu Nov 19 21:11:57 2009
@@ -29,8 +29,8 @@
 		DeliveredAck = 0, // Message delivered but not consumed
 		PoisonAck = 1, // Message could not be processed due to poison pill but discard anyway
 		ConsumedAck = 2, // Message consumed, discard
-        RedeliveredAck = 3, // Message has been Redelivered and is not yet poisoned.
-        IndividualAck = 4 // Only the given message is to be treated as consumed.
+		RedeliveredAck = 3, // Message has been Redelivered and is not yet poisoned.
+		IndividualAck = 4 // Only the given message is to be treated as consumed.
 	}
 
 	/// <summary>
@@ -38,40 +38,40 @@
 	/// </summary>
 	public class MessageConsumer : IMessageConsumer, IDispatcher
 	{
-        private readonly MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
-        private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
-        private readonly ConsumerInfo info;
-        private Session session;
+		private readonly MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
+		private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
+		private readonly ConsumerInfo info;
+		private Session session;
 
-        private MessageAck pendingAck = null;
+		private MessageAck pendingAck = null;
 
-        private Atomic<bool> started = new Atomic<bool>();
-        private Atomic<bool> deliveringAcks = new Atomic<bool>();
+		private Atomic<bool> started = new Atomic<bool>();
+		private Atomic<bool> deliveringAcks = new Atomic<bool>();
 
 		private int maximumRedeliveryCount = 10;
 		private int redeliveryTimeout = 500;
 		protected bool disposed = false;
-        private long lastDeliveredSequenceId = 0;
-        private int deliveredCounter = 0;
-        private int additionalWindowSize = 0;
-        private long redeliveryDelay = 0;
-        private int dispatchedCount = 0;
-        private volatile bool synchronizationRegistered = false;
-        private bool clearDispatchList = false;
+		private long lastDeliveredSequenceId = 0;
+		private int deliveredCounter = 0;
+		private int additionalWindowSize = 0;
+		private long redeliveryDelay = 0;
+		private int dispatchedCount = 0;
+		private volatile bool synchronizationRegistered = false;
+		private bool clearDispatchList = false;
 
-        private const int DEFAULT_REDELIVERY_DELAY = 0;
-        private const int DEFAULT_MAX_REDELIVERIES = 5;
+		private const int DEFAULT_REDELIVERY_DELAY = 0;
+		private const int DEFAULT_MAX_REDELIVERIES = 5;
 
-        private event MessageListener listener;
+		private event MessageListener listener;
+
+		private IRedeliveryPolicy redeliveryPolicy;
 
-        private IRedeliveryPolicy redeliveryPolicy;
-        
 		// Constructor internal to prevent clients from creating an instance.
 		internal MessageConsumer(Session session, ConsumerInfo info)
 		{
 			this.session = session;
 			this.info = info;
-            this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
+			this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
 		}
 
 		~MessageConsumer()
@@ -79,13 +79,13 @@
 			Dispose(false);
 		}
 
-        #region Property Accessors
+		#region Property Accessors
+
+		public long LastDeliveredSequenceId
+		{
+			get { return this.lastDeliveredSequenceId; }
+		}
 
-        public long LastDeliveredSequenceId
-        {
-            get{ return this.lastDeliveredSequenceId; }
-        }
-        
 		public ConsumerId ConsumerId
 		{
 			get { return info.ConsumerId; }
@@ -103,18 +103,18 @@
 			set { redeliveryTimeout = value; }
 		}
 
-        public int PrefetchSize
-        {
-            get { return this.info.PrefetchSize; }
-        }
-
-        public IRedeliveryPolicy RedeliveryPolicy
-        {
-            get { return this.redeliveryPolicy; }
-            set { this.redeliveryPolicy = value; }
-        }
+		public int PrefetchSize
+		{
+			get { return this.info.PrefetchSize; }
+		}
+
+		public IRedeliveryPolicy RedeliveryPolicy
+		{
+			get { return this.redeliveryPolicy; }
+			set { this.redeliveryPolicy = value; }
+		}
 
-        #endregion
+		#endregion
 
 		#region IMessageConsumer Members
 
@@ -122,104 +122,104 @@
 		{
 			add
 			{
-                CheckClosed();
+				CheckClosed();
+
+				if(this.PrefetchSize == 0)
+				{
+					throw new NMSException("Cannot set Asynchronous Listener on a Consumer with a zero Prefetch size");
+				}
+
+				bool wasStarted = this.session.Started;
+
+				if(wasStarted == true)
+				{
+					this.session.Stop();
+				}
 
-                if(this.PrefetchSize == 0)
-                {
-                    throw new NMSException("Cannot set Asynchronous Listener on a Consumer with a zero Prefetch size");
-                }
-
-                bool wasStarted = this.session.Started;
-
-                if(wasStarted == true)
-                {
-                    this.session.Stop();
-                }
-                
 				listener += value;
-                this.session.Redispatch(this.unconsumedMessages);
+				this.session.Redispatch(this.unconsumedMessages);
 
-                if(wasStarted == true)
-                {
-                    this.session.Start();
-                }
+				if(wasStarted == true)
+				{
+					this.session.Start();
+				}
 			}
 			remove { listener -= value; }
 		}
 
 		public IMessage Receive()
 		{
-            CheckClosed();
-            CheckMessageListener();
-    
-            SendPullRequest(0);
-            MessageDispatch dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1));
-            
-            if(dispatch == null)
-            {
-                return null;
-            }
-    
-            BeforeMessageIsConsumed(dispatch);
-            AfterMessageIsConsumed(dispatch, false);
-    
-            return CreateActiveMQMessage(dispatch);
+			CheckClosed();
+			CheckMessageListener();
+
+			SendPullRequest(0);
+			MessageDispatch dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1));
+
+			if(dispatch == null)
+			{
+				return null;
+			}
+
+			BeforeMessageIsConsumed(dispatch);
+			AfterMessageIsConsumed(dispatch, false);
+
+			return CreateActiveMQMessage(dispatch);
 		}
 
 		public IMessage Receive(TimeSpan timeout)
 		{
-            CheckClosed();
-            CheckMessageListener();
-    
-            MessageDispatch dispatch = null;
-            SendPullRequest((long)timeout.TotalMilliseconds);
-            
-            if(this.PrefetchSize == 0)
-            {
-                dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1));
-            }
-            else
-            {
-                dispatch = this.Dequeue(timeout);
-            }
-            
-            if(dispatch == null)
-            {
-                return null;
-            }
-    
-            BeforeMessageIsConsumed(dispatch);
-            AfterMessageIsConsumed(dispatch, false);
-            
-            return CreateActiveMQMessage(dispatch);
+			CheckClosed();
+			CheckMessageListener();
+
+			MessageDispatch dispatch = null;
+			SendPullRequest((long) timeout.TotalMilliseconds);
+
+			if(this.PrefetchSize == 0)
+			{
+				dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1));
+			}
+			else
+			{
+				dispatch = this.Dequeue(timeout);
+			}
+
+			if(dispatch == null)
+			{
+				return null;
+			}
+
+			BeforeMessageIsConsumed(dispatch);
+			AfterMessageIsConsumed(dispatch, false);
+
+			return CreateActiveMQMessage(dispatch);
 		}
 
 		public IMessage ReceiveNoWait()
 		{
-            CheckClosed();
-            CheckMessageListener();
+			CheckClosed();
+			CheckMessageListener();
 
-            MessageDispatch dispatch = null;
-            SendPullRequest(-1);
-            
-            if(this.PrefetchSize == 0)
-            {
-                dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1));
-            }
-            else
-            {
-                dispatch = this.Dequeue(TimeSpan.Zero);
-            }
-            
-            if(dispatch == null)
-            {
-                return null;
-            }
-    
-            BeforeMessageIsConsumed(dispatch);
-            AfterMessageIsConsumed(dispatch, false);
-    
-            return CreateActiveMQMessage(dispatch);
+			MessageDispatch dispatch = null;
+			SendPullRequest(-1);
+
+			if(this.PrefetchSize == 0)
+			{
+				dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1));
+			}
+			else
+			{
+				dispatch = this.Dequeue(TimeSpan.Zero);
+			}
+
+			if(dispatch == null)
+			{
+				return null;
+			}
+
+			BeforeMessageIsConsumed(dispatch);
+			AfterMessageIsConsumed(dispatch, false);
+
+			return CreateActiveMQMessage(dispatch);
 		}
 
 		public void Dispose()
@@ -242,8 +242,8 @@
 
 			try
 			{
-                Close();
-            }
+				Close();
+			}
 			catch
 			{
 				// Ignore network errors.
@@ -254,53 +254,53 @@
 
 		public void Close()
 		{
-            if(!this.unconsumedMessages.Closed)
-            {
-                if(this.session.IsTransacted && this.session.TransactionContext.InTransaction) 
-                {
-                    this.session.TransactionContext.AddSynchronization(new ConsumerCloseSynchronization(this));
-                } 
-                else
-                {
-                    this.DoClose();
-                } 
-            }
-		}
-
-        internal void DoClose()
-        {
-            if(!this.unconsumedMessages.Closed)
-            {                    
-                // Do we have any acks we need to send out before closing?
-                // Ack any delivered messages now.
-                if(!this.session.IsTransacted) 
-                {
-                    DeliverAcks();
-                    if(this.IsAutoAcknowledgeBatch)
-                    {
-                        Acknowledge();
-                    }
-                }
-                
-                if(!this.session.IsTransacted)
-                {
-                    lock(this.dispatchedMessages)
-                    {
-                        dispatchedMessages.Clear();
-                    }
-                }
-                
-                this.unconsumedMessages.Close();
-                this.session.DisposeOf(this.info.ConsumerId, this.lastDeliveredSequenceId);
-
-                RemoveInfo removeCommand = new RemoveInfo();
-                removeCommand.ObjectId = this.info.ConsumerId;
-                removeCommand.LastDeliveredSequenceId = this.lastDeliveredSequenceId;
-                
-                this.session.Connection.Oneway(removeCommand);
-                this.session = null;
-            }
-        }
+			if(!this.unconsumedMessages.Closed)
+			{
+				if(this.session.IsTransacted && this.session.TransactionContext.InTransaction)
+				{
+					this.session.TransactionContext.AddSynchronization(new ConsumerCloseSynchronization(this));
+				}
+				else
+				{
+					this.DoClose();
+				}
+			}
+		}
+
+		internal void DoClose()
+		{
+			if(!this.unconsumedMessages.Closed)
+			{
+				// Do we have any acks we need to send out before closing?
+				// Ack any delivered messages now.
+				if(!this.session.IsTransacted)
+				{
+					DeliverAcks();
+					if(this.IsAutoAcknowledgeBatch)
+					{
+						Acknowledge();
+					}
+				}
+
+				if(!this.session.IsTransacted)
+				{
+					lock(this.dispatchedMessages)
+					{
+						dispatchedMessages.Clear();
+					}
+				}
+
+				this.unconsumedMessages.Close();
+				this.session.DisposeOf(this.info.ConsumerId, this.lastDeliveredSequenceId);
+
+				RemoveInfo removeCommand = new RemoveInfo();
+				removeCommand.ObjectId = this.info.ConsumerId;
+				removeCommand.LastDeliveredSequenceId = this.lastDeliveredSequenceId;
+
+				this.session.Connection.Oneway(removeCommand);
+				this.session = null;
+			}
+		}
 
 		#endregion
 
@@ -315,716 +315,739 @@
 				messagePull.ResponseRequired = false;
 
 				Tracer.Debug("Sending MessagePull: " + messagePull);
-			    session.Connection.Oneway(messagePull);
+				session.Connection.Oneway(messagePull);
+			}
+		}
+
+		protected void DoIndividualAcknowledge(ActiveMQMessage message)
+		{
+			MessageDispatch dispatch = null;
+
+			lock(this.dispatchedMessages)
+			{
+				foreach(MessageDispatch originalDispatch in this.dispatchedMessages)
+				{
+					if(originalDispatch.Message.MessageId.Equals(message.MessageId))
+					{
+						dispatch = originalDispatch;
+						this.dispatchedMessages.Remove(originalDispatch);
+						break;
+					}
+
+					return;
+				}
 			}
+
+			MessageAck ack = new MessageAck();
+
+			ack.AckType = (byte) AckType.IndividualAck;
+			ack.ConsumerId = this.info.ConsumerId;
+			ack.Destination = dispatch.Destination;
+			ack.LastMessageId = dispatch.Message.MessageId;
+			ack.MessageCount = 1;
+
+			this.session.Connection.Oneway(ack);
 		}
 
-        protected void DoIndividualAcknowledge(ActiveMQMessage message)
-        {
-            MessageDispatch dispatch = null;
-            
-            lock(this.dispatchedMessages)
-            {
-                foreach(MessageDispatch originalDispatch in this.dispatchedMessages)
-                {
-                    if(originalDispatch.Message.MessageId.Equals(message.MessageId))
-                    {
-                        dispatch = originalDispatch;
-                        this.dispatchedMessages.Remove(originalDispatch);
-                        break;
-                    }
-
-                    return;
-                }
-            }
-
-            MessageAck ack = new MessageAck();
-
-            ack.AckType = (byte)AckType.IndividualAck;
-            ack.ConsumerId = this.info.ConsumerId;
-            ack.Destination = dispatch.Destination;
-            ack.LastMessageId = dispatch.Message.MessageId;
-            ack.MessageCount = 1;            
-
-            this.session.Connection.Oneway(ack);            
-        }
-                
 		protected void DoNothingAcknowledge(ActiveMQMessage message)
 		{
 		}
 
 		protected void DoClientAcknowledge(ActiveMQMessage message)
 		{
-            this.CheckClosed();
+			this.CheckClosed();
 			Tracer.Debug("Sending Client Ack:");
-            this.session.Acknowledge();
+			this.session.Acknowledge();
+		}
+
+		public void Start()
+		{
+			if(this.unconsumedMessages.Closed)
+			{
+				return;
+			}
+
+			this.started.Value = true;
+			this.unconsumedMessages.Start();
+			this.session.Executor.Wakeup();
 		}
 
-        public void Start()
-        {
-            if(this.unconsumedMessages.Closed)
-            {
-                return;
-            }
-
-            this.started.Value = true;
-            this.unconsumedMessages.Start();
-            this.session.Executor.Wakeup();
-        }
-    
-        public void Stop()
-        {
-            this.started.Value = false;
-            this.unconsumedMessages.Stop();
-        }
-
-        public void ClearMessagesInProgress()
-        {
-            // we are called from inside the transport reconnection logic
-            // which involves us clearing all the connections' consumers
-            // dispatch lists and clearing them
-            // so rather than trying to grab a mutex (which could be already
-            // owned by the message listener calling the send) we will just set
-            // a flag so that the list can be cleared as soon as the
-            // dispatch thread is ready to flush the dispatch list
-            this.clearDispatchList = true;
-        }
-
-        public void DeliverAcks()
-        {
-            MessageAck ack = null;
-            
-            if(this.deliveringAcks.CompareAndSet(false, true))
-            {
-                if(this.IsAutoAcknowledgeEach)
-                {
-                    lock(this.dispatchedMessages)
-                    {
-                        ack = MakeAckForAllDeliveredMessages(AckType.DeliveredAck);
-                        if(ack != null) 
-                        {
-                            this.dispatchedMessages.Clear();
-                        } 
-                        else
-                        {
-                            ack = this.pendingAck;
-                            this.pendingAck = null;
-                        }
-                    }
-                } 
-                else if(pendingAck != null && pendingAck.AckType == (byte)AckType.ConsumedAck) 
-                {
-                    ack = pendingAck;
-                    pendingAck = null;
-                }
-                
-                if(ack != null)
-                {
-                    MessageAck ackToSend = ack;
-                    
-                    try
-                    {
-                        this.session.Connection.Oneway(ackToSend);
-                    }
-                    catch(Exception e)
-                    {
-                        Tracer.DebugFormat("{0} : Failed to send ack, {1}", this.info.ConsumerId, e);
-                    }
-                } 
-                else
-                {
-                    this.deliveringAcks.Value = false;
-                }
-            }
-        }
-
-        public void Dispatch(MessageDispatch dispatch)
-        {
-            MessageListener listener = this.listener;
-            
-            try 
-            {
-                lock(this.unconsumedMessages.SyncRoot) 
-                {
-                    if(this.clearDispatchList)
-                    {
-                        // we are reconnecting so lets flush the in progress messages
-                        this.clearDispatchList = false;
-                        this.unconsumedMessages.Clear();
-                        
-                        if(this.pendingAck != null && this.pendingAck.AckType == (byte)AckType.DeliveredAck) 
-                        {
-                            // on resumption a pending delivered ack will be out of sync with
-                            // re-deliveries.
-                            Tracer.Debug("removing pending delivered ack on transport interupt: " + pendingAck);
-                            this.pendingAck = null;
-                        }
-                    }
-                    
-                    if(!this.unconsumedMessages.Closed) 
-                    {
-                        if(listener != null && this.unconsumedMessages.Running)
-                        {
-                            ActiveMQMessage message = CreateActiveMQMessage(dispatch);
-                            
-                            this.BeforeMessageIsConsumed(dispatch);
-                            
-                            try
-                            {
-                                bool expired = message.IsExpired();
-                                
-                                if(!expired)
-                                {
-                                    listener(message);
-                                }
-                                
-                                this.AfterMessageIsConsumed(dispatch, expired);
-                            } 
-                            catch(Exception e) 
-                            {
-                                if(IsAutoAcknowledgeBatch || IsAutoAcknowledgeEach || this.session.IsIndividualAcknowledge)
-                                {
-                                    // Redeliver the message
-                                } 
-                                else
-                                {
-                                    // Transacted or Client ack: Deliver the next message.
-                                    this.AfterMessageIsConsumed(dispatch, false);
-                                }
-
-                                Tracer.Error(this.info.ConsumerId + " Exception while processing message: " + e);
-                            }
-                        } 
-                        else 
-                        {
-                            this.unconsumedMessages.Enqueue(dispatch);
-                        }
-                    }
-                }
-                
-                if(++dispatchedCount % 1000 == 0) 
-                {
-                    dispatchedCount = 0;
-                    Thread.Sleep(1);
-                }
-            } 
-            catch(Exception e) 
-            {
-                this.session.Connection.OnSessionException(this.session, e);
-            }
-        }
-
-        public bool Iterate()
-        {
-            if(this.listener != null) 
-            {
-                MessageDispatch dispatch = this.unconsumedMessages.DequeueNoWait();
-                if(dispatch != null) 
-                {
-                    try 
-                    {
-                        ActiveMQMessage message = CreateActiveMQMessage(dispatch);
-                        BeforeMessageIsConsumed(dispatch);
-                        listener(message);
-                        AfterMessageIsConsumed(dispatch, false);
-                    } 
-                    catch(NMSException e) 
-                    {
-                        this.session.Connection.OnSessionException(this.session, e);
-                    }
-                    
-                    return true;
-                }
-            }
-            
-            return false;
-        }
-
-        /// <summary>
-        /// Used to get an enqueued message from the unconsumedMessages list. The
-        /// amount of time this method blocks is based on the timeout value.  if
-        /// timeout == Timeout.Infinite then it blocks until a message is received. 
-        /// if timeout == 0 then it it tries to not block at all, it returns a 
-        /// message if it is available if timeout > 0 then it blocks up to timeout 
-        /// amount of time.  Expired messages will consumed by this method.
-        /// </summary>
-        /// <param name="timeout">
-        /// A <see cref="System.Int64"/>
-        /// </param>
-        /// <returns>
-        /// A <see cref="MessageDispatch"/>
-        /// </returns>
-        private MessageDispatch Dequeue(TimeSpan timeout) 
-        {
-            DateTime deadline = DateTime.Now;
-            
-            if(timeout > TimeSpan.Zero)
-            {
-                deadline = DateTime.Now + timeout;
-            }
-            
-            while(true) 
-            {
-                MessageDispatch dispatch = this.unconsumedMessages.Dequeue(timeout);
-                
-                if(dispatch == null)
-                {
-                    if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed) 
-                    {
-                        timeout = deadline < DateTime.Now ? TimeSpan.Zero : deadline - DateTime.Now;
-                    }
-                    else
-                    {
-                        return null;
-                    }
-                } 
-                else if(dispatch.Message == null) 
-                {
-                    return null;
-                } 
-                else if(dispatch.Message.IsExpired())
-                {
-                    Tracer.DebugFormat("{0} received expired message: {1}", info.ConsumerId, dispatch.Message.MessageId);
-                    
-                    BeforeMessageIsConsumed(dispatch);
-                    AfterMessageIsConsumed(dispatch, true);
-                    
-                    if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed) 
-                    {
-                        timeout = deadline < DateTime.Now ? TimeSpan.Zero : deadline - DateTime.Now;
-                    }
-                } 
-                else 
-                {
-                    return dispatch;
-                }
-            }
-        }
-
-        public void BeforeMessageIsConsumed(MessageDispatch dispatch)
-        {
-            this.lastDeliveredSequenceId = dispatch.Message.MessageId.BrokerSequenceId;
-            
-            if(!IsAutoAcknowledgeBatch)
-            {
-                lock(this.dispatchedMessages) 
-                {
-                    this.dispatchedMessages.AddFirst(dispatch);
-                }
-                
-                if(this.session.IsTransacted) 
-                {
-                    this.AckLater(dispatch, AckType.DeliveredAck);
-                }
-            }            
-        }
-        
-        public void AfterMessageIsConsumed(MessageDispatch dispatch, bool expired)
-        {
-            if(this.unconsumedMessages.Closed) 
-            {
-                return;
-            }
-            
-            if(expired == true) 
-            {
-                lock(this.dispatchedMessages)
-                {
-                    this.dispatchedMessages.Remove(dispatch);
-                }
-                
-                AckLater(dispatch, AckType.DeliveredAck);
-            }
-            else
-            {
-                if(this.session.IsTransacted)
-                {
-                    // Do nothing.
-                }
-                else if(this.IsAutoAcknowledgeEach)
-                {
-                    if(this.deliveringAcks.CompareAndSet(false, true)) 
-                    {
-                        lock(this.dispatchedMessages) 
-                        {
-                            if(this.dispatchedMessages.Count != 0)
-                            {
-                                MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
-                                if(ack !=null) 
-                                {
-                                    this.dispatchedMessages.Clear();
-                                    this.session.Connection.Oneway(ack);
-                                }
-                            }
-                        }
-                        this.deliveringAcks.Value = false;
-                    }
-                } 
-                else if(this.IsAutoAcknowledgeBatch)
-                {
-                    AckLater(dispatch, AckType.ConsumedAck);
-                }
-                else if(this.session.IsClientAcknowledge || this.session.IsIndividualAcknowledge) 
-                {
-                    AckLater(dispatch, AckType.DeliveredAck);
-                } 
-                else 
-                {
-                    throw new NMSException("Invalid session state.");
-                }
-            }            
-        }
-
-        private MessageAck MakeAckForAllDeliveredMessages(AckType type)
-        {
-            lock(this.dispatchedMessages)
-            {
-                if(this.dispatchedMessages.Count == 0)
-                {
-                    return null;
-                }
-                    
-                MessageDispatch dispatch = this.dispatchedMessages.First.Value;
-                MessageAck ack = new MessageAck();
-                
-                ack.AckType = (byte)type;
-                ack.ConsumerId = this.info.ConsumerId;
-                ack.Destination = dispatch.Destination;
-                ack.LastMessageId = dispatch.Message.MessageId;
-                ack.MessageCount = this.dispatchedMessages.Count;
-                ack.FirstMessageId = this.dispatchedMessages.Last.Value.Message.MessageId;
-                
-                return ack;
-            }
-        }
-
-        private void AckLater(MessageDispatch dispatch, AckType type)
-        {
-            // Don't acknowledge now, but we may need to let the broker know the
-            // consumer got the message to expand the pre-fetch window
-            if(this.session.IsTransacted)
-            {
-                this.session.DoStartTransaction();
-                
-                if(!synchronizationRegistered) 
-                {
-                    this.synchronizationRegistered = true;
-                    this.session.TransactionContext.AddSynchronization(new MessageConsumerSynchronization(this));
-                }
-            }
-
-            this.deliveredCounter++;
-            
-            MessageAck oldPendingAck = pendingAck;
-            
-            pendingAck = new MessageAck();
-            pendingAck.AckType = (byte)type;
-            pendingAck.ConsumerId = this.info.ConsumerId;
-            pendingAck.Destination = dispatch.Destination;
-            pendingAck.LastMessageId = dispatch.Message.MessageId;
-            pendingAck.MessageCount = deliveredCounter;
-
-            if(this.session.IsTransacted && this.session.TransactionContext.InTransaction)
-            {
-                pendingAck.TransactionId = this.session.TransactionContext.TransactionId;
-            }
-            
-            if(oldPendingAck == null) 
-            {
-                pendingAck.FirstMessageId = pendingAck.LastMessageId;
-            } 
-            else if(oldPendingAck.AckType == pendingAck.AckType)
-            {
-                pendingAck.FirstMessageId = oldPendingAck.FirstMessageId;
-            } 
-            else 
-            {
-                // old pending ack being superseded by ack of another type, if is is not a delivered
-                // ack and hence important, send it now so it is not lost.
-                if(oldPendingAck.AckType != (byte)AckType.DeliveredAck) 
-                {
-                    Tracer.Debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
-                    this.session.Connection.Oneway(oldPendingAck);
-                } 
-                else
-                {
-                    Tracer.Debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
-                }
-            }
-            
-            if((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter - this.additionalWindowSize)) 
-            {
-                this.session.Connection.Oneway(pendingAck);
-                this.pendingAck = null;
-                this.deliveredCounter = 0;
-                this.additionalWindowSize = 0;
-            }
-        }
-
-        internal void Acknowledge()
-        {
-            lock(this.dispatchedMessages)
-            {
-                // Acknowledge all messages so far.
-                MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
-                
-                if(ack == null)
-                {
-                    return; // no msgs
-                }
-                
-                if(this.session.IsTransacted)
-                {
-                    this.session.DoStartTransaction();
-                    ack.TransactionId = this.session.TransactionContext.TransactionId;
-                }
-                
-                this.session.Connection.Oneway(ack);
-                this.pendingAck = null;
-                
-                // Adjust the counters
-                this.deliveredCounter = Math.Max(0, this.deliveredCounter - this.dispatchedMessages.Count);
-                this.additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.dispatchedMessages.Count);
-                
-                if(!this.session.IsTransacted) 
-                {
-                    this.dispatchedMessages.Clear();
-                } 
-            }            
-        }        
-
-        private void Commit()
-        {
-            lock(this.dispatchedMessages)
-            {
-                this.dispatchedMessages.Clear();
-            }
-            
-            this.redeliveryDelay = 0;
-        }
-
-        private void Rollback()
-        {
-            lock(this.unconsumedMessages.SyncRoot)
-            {
-                lock(this.dispatchedMessages)
-                {
-                    if(this.dispatchedMessages.Count == 0)
-                    {
-                        return;
-                    }
-        
-                    // Only increase the redelivery delay after the first redelivery..
-                    MessageDispatch lastMd = this.dispatchedMessages.First.Value;
-                    int currentRedeliveryCount = lastMd.Message.RedeliveryCounter;
-                    
-                    redeliveryDelay = this.redeliveryPolicy.RedeliveryDelay(currentRedeliveryCount);
-                    
-                    MessageId firstMsgId = this.dispatchedMessages.Last.Value.Message.MessageId;
-
-                    foreach(MessageDispatch dispatch in this.dispatchedMessages)
-                    {
-                        // Allow the message to update its internal to reflect a Rollback.
-                        dispatch.Message.OnMessageRollback();
-                    }
-        
-                    if(this.redeliveryPolicy.MaximumRedeliveries >= 0 &&
-                       lastMd.Message.RedeliveryCounter > this.redeliveryPolicy.MaximumRedeliveries)
-                    {
-                        // We need to NACK the messages so that they get sent to the DLQ.
-                        MessageAck ack = new MessageAck();
-
-                        ack.AckType = (byte)AckType.PoisonAck;
-                        ack.ConsumerId = this.info.ConsumerId;
-                        ack.Destination = lastMd.Destination;
-                        ack.LastMessageId = lastMd.Message.MessageId;
-                        ack.MessageCount = this.dispatchedMessages.Count;                                    
-                        ack.FirstMessageId = firstMsgId;
-
-                        this.session.Connection.Oneway(ack);
-                        
-                        // Adjust the window size.
-                        additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.dispatchedMessages.Count);
-                        
-                        this.redeliveryDelay = 0;
-                    } 
-                    else
-                    {                        
-                        // We only send a RedeliveryAck after the first redelivery
-                        if(currentRedeliveryCount > 0)
-                        {
-                            MessageAck ack = new MessageAck();
-                            
-                            ack.AckType = (byte)AckType.RedeliveredAck;
-                            ack.ConsumerId = this.info.ConsumerId;
-                            ack.Destination = lastMd.Destination;
-                            ack.LastMessageId = lastMd.Message.MessageId;
-                            ack.MessageCount = this.dispatchedMessages.Count;                                    
-                            ack.FirstMessageId = firstMsgId;
-                            
-                            this.session.Connection.Oneway(ack);
-                        }
-        
-                        // stop the delivery of messages.
-                        this.unconsumedMessages.Stop();
-
-                        foreach(MessageDispatch dispatch in this.dispatchedMessages)
-                        {
-                            this.unconsumedMessages.EnqueueFirst(dispatch);
-                        }
-        
-                        if(redeliveryDelay > 0 && !this.unconsumedMessages.Closed)
-                        {
-                            DateTime deadline = DateTime.Now.AddMilliseconds(redeliveryDelay);
-                            ThreadPool.QueueUserWorkItem(this.RollbackHelper, deadline);
-                        } 
-                        else 
-                        {
-                            Start();
-                        }
-                    }
-                    
-                    this.deliveredCounter -= this.dispatchedMessages.Count;
-                    this.dispatchedMessages.Clear();
-                }
-            }
-
-            // Only redispatch if there's an async listener otherwise a synchronous
-            // consumer will pull them from the local queue.
-            if(this.listener != null) 
-            {
-                this.session.Redispatch(this.unconsumedMessages);
-            }
-        }
-        
-        private void RollbackHelper(Object arg)
-        {
-            try
-            {
-                TimeSpan waitTime = (DateTime) arg - DateTime.Now;
-
-                if(waitTime.CompareTo(TimeSpan.Zero) > 0)
-                {
-                    Thread.Sleep(waitTime);
-                }
-                
-                this.Start();
-            }            
-            catch(Exception e)
-            {
+		public void Stop()
+		{
+			this.started.Value = false;
+			this.unconsumedMessages.Stop();
+		}
+
+		public void ClearMessagesInProgress()
+		{
+			// we are called from inside the transport reconnection logic
+			// which involves us clearing all the connections' consumers
+			// dispatch lists and clearing them
+			// so rather than trying to grab a mutex (which could be already
+			// owned by the message listener calling the send) we will just set
+			// a flag so that the list can be cleared as soon as the
+			// dispatch thread is ready to flush the dispatch list
+			this.clearDispatchList = true;
+		}
+
+		public void DeliverAcks()
+		{
+			MessageAck ack = null;
+
+			if(this.deliveringAcks.CompareAndSet(false, true))
+			{
+				if(this.IsAutoAcknowledgeEach)
+				{
+					lock(this.dispatchedMessages)
+					{
+						ack = MakeAckForAllDeliveredMessages(AckType.DeliveredAck);
+						if(ack != null)
+						{
+							this.dispatchedMessages.Clear();
+						}
+						else
+						{
+							ack = this.pendingAck;
+							this.pendingAck = null;
+						}
+					}
+				}
+				else if(pendingAck != null && pendingAck.AckType == (byte) AckType.ConsumedAck)
+				{
+					ack = pendingAck;
+					pendingAck = null;
+				}
+
+				if(ack != null)
+				{
+					MessageAck ackToSend = ack;
+
+					try
+					{
+						this.session.Connection.Oneway(ackToSend);
+					}
+					catch(Exception e)
+					{
+						Tracer.DebugFormat("{0} : Failed to send ack, {1}", this.info.ConsumerId, e);
+					}
+				}
+				else
+				{
+					this.deliveringAcks.Value = false;
+				}
+			}
+		}
+
+		public void Dispatch(MessageDispatch dispatch)
+		{
+			MessageListener listener = this.listener;
+
+			try
+			{
+				lock(this.unconsumedMessages.SyncRoot)
+				{
+					if(this.clearDispatchList)
+					{
+						// we are reconnecting so lets flush the in progress messages
+						this.clearDispatchList = false;
+						this.unconsumedMessages.Clear();
+
+						if(this.pendingAck != null && this.pendingAck.AckType == (byte) AckType.DeliveredAck)
+						{
+							// on resumption a pending delivered ack will be out of sync with
+							// re-deliveries.
+							Tracer.Debug("removing pending delivered ack on transport interupt: " + pendingAck);
+							this.pendingAck = null;
+						}
+					}
+
+					if(!this.unconsumedMessages.Closed)
+					{
+						if(listener != null && this.unconsumedMessages.Running)
+						{
+							ActiveMQMessage message = CreateActiveMQMessage(dispatch);
+
+							this.BeforeMessageIsConsumed(dispatch);
+
+							try
+							{
+								bool expired = message.IsExpired();
+
+								if(!expired)
+								{
+									listener(message);
+								}
+
+								this.AfterMessageIsConsumed(dispatch, expired);
+							}
+							catch(Exception e)
+							{
+								if(IsAutoAcknowledgeBatch || IsAutoAcknowledgeEach || this.session.IsIndividualAcknowledge)
+								{
+									// Redeliver the message
+								}
+								else
+								{
+									// Transacted or Client ack: Deliver the next message.
+									this.AfterMessageIsConsumed(dispatch, false);
+								}
+
+								Tracer.Error(this.info.ConsumerId + " Exception while processing message: " + e);
+							}
+						}
+						else
+						{
+							this.unconsumedMessages.Enqueue(dispatch);
+						}
+					}
+				}
+
+				if(++dispatchedCount % 1000 == 0)
+				{
+					dispatchedCount = 0;
+					Thread.Sleep(1);
+				}
+			}
+			catch(Exception e)
+			{
+				this.session.Connection.OnSessionException(this.session, e);
+			}
+		}
+
+		public bool Iterate()
+		{
+			if(this.listener != null)
+			{
+				MessageDispatch dispatch = this.unconsumedMessages.DequeueNoWait();
+				if(dispatch != null)
+				{
+					try
+					{
+						ActiveMQMessage message = CreateActiveMQMessage(dispatch);
+						BeforeMessageIsConsumed(dispatch);
+						listener(message);
+						AfterMessageIsConsumed(dispatch, false);
+					}
+					catch(NMSException e)
+					{
+						this.session.Connection.OnSessionException(this.session, e);
+					}
+
+					return true;
+				}
+			}
+
+			return false;
+		}
+
+		/// <summary>
+		/// Used to get an enqueued message from the unconsumedMessages list. The
+		/// amount of time this method blocks is based on the timeout value.  if
+		/// timeout == Timeout.Infinite then it blocks until a message is received. 
+		/// if timeout == 0 then it it tries to not block at all, it returns a 
+		/// message if it is available if timeout > 0 then it blocks up to timeout 
+		/// amount of time.  Expired messages will consumed by this method.
+		/// </summary>
+		/// <param name="timeout">
+		/// A <see cref="System.TimeSpan"/>
+		/// </param>
+		/// <returns>
+		/// A <see cref="MessageDispatch"/>
+		/// </returns>
+		private MessageDispatch Dequeue(TimeSpan timeout)
+		{
+			DateTime deadline = DateTime.Now;
+
+			if(timeout > TimeSpan.Zero)
+			{
+				deadline += timeout;
+			}
+
+			while(true)
+			{
+				MessageDispatch dispatch = this.unconsumedMessages.Dequeue(timeout);
+
+				// Grab a single date/time for calculations to avoid timing errors.
+				DateTime dispatchTime = DateTime.Now;
+
+				if(dispatch == null)
+				{
+					if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed)
+					{
+						if(dispatchTime > deadline)
+						{
+							// Out of time.
+							timeout = TimeSpan.Zero;
+						}
+						else
+						{
+							// Adjust the timeout to the remaining time.
+							timeout = deadline - dispatchTime;
+						}
+					}
+					else
+					{
+						return null;
+					}
+				}
+				else if(dispatch.Message == null)
+				{
+					return null;
+				}
+				else if(dispatch.Message.IsExpired())
+				{
+					Tracer.DebugFormat("{0} received expired message: {1}", info.ConsumerId, dispatch.Message.MessageId);
+
+					BeforeMessageIsConsumed(dispatch);
+					AfterMessageIsConsumed(dispatch, true);
+					// Refresh the dispatch time
+					dispatchTime = DateTime.Now;
+
+					if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed)
+					{
+						if(dispatchTime > deadline)
+						{
+							// Out of time.
+							timeout = TimeSpan.Zero;
+						}
+						else
+						{
+							// Adjust the timeout to the remaining time.
+							timeout = deadline - dispatchTime;
+						}
+					}
+				}
+				else
+				{
+					return dispatch;
+				}
+			}
+		}
+
+		public void BeforeMessageIsConsumed(MessageDispatch dispatch)
+		{
+			this.lastDeliveredSequenceId = dispatch.Message.MessageId.BrokerSequenceId;
+
+			if(!IsAutoAcknowledgeBatch)
+			{
+				lock(this.dispatchedMessages)
+				{
+					this.dispatchedMessages.AddFirst(dispatch);
+				}
+
+				if(this.session.IsTransacted)
+				{
+					this.AckLater(dispatch, AckType.DeliveredAck);
+				}
+			}
+		}
+
+		public void AfterMessageIsConsumed(MessageDispatch dispatch, bool expired)
+		{
+			if(this.unconsumedMessages.Closed)
+			{
+				return;
+			}
+
+			if(expired == true)
+			{
+				lock(this.dispatchedMessages)
+				{
+					this.dispatchedMessages.Remove(dispatch);
+				}
+
+				AckLater(dispatch, AckType.DeliveredAck);
+			}
+			else
+			{
+				if(this.session.IsTransacted)
+				{
+					// Do nothing.
+				}
+				else if(this.IsAutoAcknowledgeEach)
+				{
+					if(this.deliveringAcks.CompareAndSet(false, true))
+					{
+						lock(this.dispatchedMessages)
+						{
+							if(this.dispatchedMessages.Count != 0)
+							{
+								MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
+								if(ack != null)
+								{
+									this.dispatchedMessages.Clear();
+									this.session.Connection.Oneway(ack);
+								}
+							}
+						}
+						this.deliveringAcks.Value = false;
+					}
+				}
+				else if(this.IsAutoAcknowledgeBatch)
+				{
+					AckLater(dispatch, AckType.ConsumedAck);
+				}
+				else if(this.session.IsClientAcknowledge || this.session.IsIndividualAcknowledge)
+				{
+					AckLater(dispatch, AckType.DeliveredAck);
+				}
+				else
+				{
+					throw new NMSException("Invalid session state.");
+				}
+			}
+		}
+
+		private MessageAck MakeAckForAllDeliveredMessages(AckType type)
+		{
+			lock(this.dispatchedMessages)
+			{
+				if(this.dispatchedMessages.Count == 0)
+				{
+					return null;
+				}
+
+				MessageDispatch dispatch = this.dispatchedMessages.First.Value;
+				MessageAck ack = new MessageAck();
+
+				ack.AckType = (byte) type;
+				ack.ConsumerId = this.info.ConsumerId;
+				ack.Destination = dispatch.Destination;
+				ack.LastMessageId = dispatch.Message.MessageId;
+				ack.MessageCount = this.dispatchedMessages.Count;
+				ack.FirstMessageId = this.dispatchedMessages.Last.Value.Message.MessageId;
+
+				return ack;
+			}
+		}
+
+		private void AckLater(MessageDispatch dispatch, AckType type)
+		{
+			// Don't acknowledge now, but we may need to let the broker know the
+			// consumer got the message to expand the pre-fetch window
+			if(this.session.IsTransacted)
+			{
+				this.session.DoStartTransaction();
+
+				if(!synchronizationRegistered)
+				{
+					this.synchronizationRegistered = true;
+					this.session.TransactionContext.AddSynchronization(new MessageConsumerSynchronization(this));
+				}
+			}
+
+			this.deliveredCounter++;
+
+			MessageAck oldPendingAck = pendingAck;
+
+			pendingAck = new MessageAck();
+			pendingAck.AckType = (byte) type;
+			pendingAck.ConsumerId = this.info.ConsumerId;
+			pendingAck.Destination = dispatch.Destination;
+			pendingAck.LastMessageId = dispatch.Message.MessageId;
+			pendingAck.MessageCount = deliveredCounter;
+
+			if(this.session.IsTransacted && this.session.TransactionContext.InTransaction)
+			{
+				pendingAck.TransactionId = this.session.TransactionContext.TransactionId;
+			}
+
+			if(oldPendingAck == null)
+			{
+				pendingAck.FirstMessageId = pendingAck.LastMessageId;
+			}
+			else if(oldPendingAck.AckType == pendingAck.AckType)
+			{
+				pendingAck.FirstMessageId = oldPendingAck.FirstMessageId;
+			}
+			else
+			{
+				// old pending ack being superseded by ack of another type, if is is not a delivered
+				// ack and hence important, send it now so it is not lost.
+				if(oldPendingAck.AckType != (byte) AckType.DeliveredAck)
+				{
+					Tracer.Debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
+					this.session.Connection.Oneway(oldPendingAck);
+				}
+				else
+				{
+					Tracer.Debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
+				}
+			}
+
+			if((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter - this.additionalWindowSize))
+			{
+				this.session.Connection.Oneway(pendingAck);
+				this.pendingAck = null;
+				this.deliveredCounter = 0;
+				this.additionalWindowSize = 0;
+			}
+		}
+
+		internal void Acknowledge()
+		{
+			lock(this.dispatchedMessages)
+			{
+				// Acknowledge all messages so far.
+				MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
+
+				if(ack == null)
+				{
+					return; // no msgs
+				}
+
+				if(this.session.IsTransacted)
+				{
+					this.session.DoStartTransaction();
+					ack.TransactionId = this.session.TransactionContext.TransactionId;
+				}
+
+				this.session.Connection.Oneway(ack);
+				this.pendingAck = null;
+
+				// Adjust the counters
+				this.deliveredCounter = Math.Max(0, this.deliveredCounter - this.dispatchedMessages.Count);
+				this.additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.dispatchedMessages.Count);
+
+				if(!this.session.IsTransacted)
+				{
+					this.dispatchedMessages.Clear();
+				}
+			}
+		}
+
+		private void Commit()
+		{
+			lock(this.dispatchedMessages)
+			{
+				this.dispatchedMessages.Clear();
+			}
+
+			this.redeliveryDelay = 0;
+		}
+
+		private void Rollback()
+		{
+			lock(this.unconsumedMessages.SyncRoot)
+			{
+				lock(this.dispatchedMessages)
+				{
+					if(this.dispatchedMessages.Count == 0)
+					{
+						return;
+					}
+
+					// Only increase the redelivery delay after the first redelivery..
+					MessageDispatch lastMd = this.dispatchedMessages.First.Value;
+					int currentRedeliveryCount = lastMd.Message.RedeliveryCounter;
+
+					redeliveryDelay = this.redeliveryPolicy.RedeliveryDelay(currentRedeliveryCount);
+
+					MessageId firstMsgId = this.dispatchedMessages.Last.Value.Message.MessageId;
+
+					foreach(MessageDispatch dispatch in this.dispatchedMessages)
+					{
+						// Allow the message to update its internal to reflect a Rollback.
+						dispatch.Message.OnMessageRollback();
+					}
+
+					if(this.redeliveryPolicy.MaximumRedeliveries >= 0 &&
+					   lastMd.Message.RedeliveryCounter > this.redeliveryPolicy.MaximumRedeliveries)
+					{
+						// We need to NACK the messages so that they get sent to the DLQ.
+						MessageAck ack = new MessageAck();
+
+						ack.AckType = (byte) AckType.PoisonAck;
+						ack.ConsumerId = this.info.ConsumerId;
+						ack.Destination = lastMd.Destination;
+						ack.LastMessageId = lastMd.Message.MessageId;
+						ack.MessageCount = this.dispatchedMessages.Count;
+						ack.FirstMessageId = firstMsgId;
+
+						this.session.Connection.Oneway(ack);
+
+						// Adjust the window size.
+						additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.dispatchedMessages.Count);
+
+						this.redeliveryDelay = 0;
+					}
+					else
+					{
+						// We only send a RedeliveryAck after the first redelivery
+						if(currentRedeliveryCount > 0)
+						{
+							MessageAck ack = new MessageAck();
+
+							ack.AckType = (byte) AckType.RedeliveredAck;
+							ack.ConsumerId = this.info.ConsumerId;
+							ack.Destination = lastMd.Destination;
+							ack.LastMessageId = lastMd.Message.MessageId;
+							ack.MessageCount = this.dispatchedMessages.Count;
+							ack.FirstMessageId = firstMsgId;
+
+							this.session.Connection.Oneway(ack);
+						}
+
+						// stop the delivery of messages.
+						this.unconsumedMessages.Stop();
+
+						foreach(MessageDispatch dispatch in this.dispatchedMessages)
+						{
+							this.unconsumedMessages.EnqueueFirst(dispatch);
+						}
+
+						if(redeliveryDelay > 0 && !this.unconsumedMessages.Closed)
+						{
+							DateTime deadline = DateTime.Now.AddMilliseconds(redeliveryDelay);
+							ThreadPool.QueueUserWorkItem(this.RollbackHelper, deadline);
+						}
+						else
+						{
+							Start();
+						}
+					}
+
+					this.deliveredCounter -= this.dispatchedMessages.Count;
+					this.dispatchedMessages.Clear();
+				}
+			}
+
+			// Only redispatch if there's an async listener otherwise a synchronous
+			// consumer will pull them from the local queue.
+			if(this.listener != null)
+			{
+				this.session.Redispatch(this.unconsumedMessages);
+			}
+		}
+
+		private void RollbackHelper(Object arg)
+		{
+			try
+			{
+				TimeSpan waitTime = (DateTime) arg - DateTime.Now;
+
+				if(waitTime.CompareTo(TimeSpan.Zero) > 0)
+				{
+					Thread.Sleep(waitTime);
+				}
+
+				this.Start();
+			}
+			catch(Exception e)
+			{
 				if(!this.unconsumedMessages.Closed)
-				{	
-                	this.session.Connection.OnSessionException(this.session, e);
+				{
+					this.session.Connection.OnSessionException(this.session, e);
 				}
-            }
-        }
+			}
+		}
+
+		private ActiveMQMessage CreateActiveMQMessage(MessageDispatch dispatch)
+		{
+			ActiveMQMessage message = dispatch.Message.Clone() as ActiveMQMessage;
 
-        private ActiveMQMessage CreateActiveMQMessage(MessageDispatch dispatch) 
-        {
-            ActiveMQMessage message = dispatch.Message.Clone() as ActiveMQMessage;
-			
 			message.Connection = this.session.Connection;
-            
-            if(this.session.IsClientAcknowledge)
-            {
-                message.Acknowledger += new AcknowledgeHandler(DoClientAcknowledge);
-            }
-            else if(this.session.IsIndividualAcknowledge)
-            {
-                message.Acknowledger += new AcknowledgeHandler(DoIndividualAcknowledge);
-            }
-            else
-            {
-                message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge);                
-            }
-            
-            return message;
-        }
-        
-        private void CheckClosed()
-        {
-            if(this.unconsumedMessages.Closed)
-            {
-                throw new NMSException("The Consumer has been Closed");
-            }
-        }
-
-        private void CheckMessageListener()
-        {
-            if(this.listener != null)
-            {
-                throw new NMSException("Cannot set Async listeners on Consumers with a prefetch limit of zero");
-            }
-        }
-
-        private bool IsAutoAcknowledgeEach
-        {
-            get 
-            { 
-                return this.session.IsAutoAcknowledge ||
-                       (this.session.IsDupsOkAcknowledge && this.info.Destination.IsQueue); 
-            }
-        }
-    
-        private bool IsAutoAcknowledgeBatch 
-        {
-            get { return this.session.IsDupsOkAcknowledge && !this.info.Destination.IsQueue; }
-        }
-
-        #region Nested ISyncronization Types
-        
-        class MessageConsumerSynchronization : ISynchronization
-        {
-            private readonly MessageConsumer consumer;
-    
-            public MessageConsumerSynchronization(MessageConsumer consumer)
-            {
-                this.consumer = consumer;
-            }
-    
-            public void BeforeEnd()
-            {
-                this.consumer.Acknowledge();
-                this.consumer.synchronizationRegistered = false;
-            }
-    
-            public void AfterCommit()
-            {
-                this.consumer.Commit();
-                this.consumer.synchronizationRegistered = false;
-            }
-    
-            public void AfterRollback()
-            {
-                this.consumer.Rollback();
-                this.consumer.synchronizationRegistered = false;
-            }
-        }
-    
-        class ConsumerCloseSynchronization : ISynchronization
-        {
-            private readonly MessageConsumer consumer;
-    
-            public ConsumerCloseSynchronization(MessageConsumer consumer)
-            {
-                this.consumer = consumer;
-            }
-    
-            public void BeforeEnd()
-            {
-            }
-    
-            public void AfterCommit()
-            {
-                this.consumer.DoClose();
-            }
-    
-            public void AfterRollback()
-            {
-                this.consumer.DoClose();
-            }
-        }
 
-        #endregion
-    }
+			if(this.session.IsClientAcknowledge)
+			{
+				message.Acknowledger += new AcknowledgeHandler(DoClientAcknowledge);
+			}
+			else if(this.session.IsIndividualAcknowledge)
+			{
+				message.Acknowledger += new AcknowledgeHandler(DoIndividualAcknowledge);
+			}
+			else
+			{
+				message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge);
+			}
+
+			return message;
+		}
+
+		private void CheckClosed()
+		{
+			if(this.unconsumedMessages.Closed)
+			{
+				throw new NMSException("The Consumer has been Closed");
+			}
+		}
+
+		private void CheckMessageListener()
+		{
+			if(this.listener != null)
+			{
+				throw new NMSException("Cannot set Async listeners on Consumers with a prefetch limit of zero");
+			}
+		}
+
+		private bool IsAutoAcknowledgeEach
+		{
+			get
+			{
+				return this.session.IsAutoAcknowledge ||
+					   (this.session.IsDupsOkAcknowledge && this.info.Destination.IsQueue);
+			}
+		}
+
+		private bool IsAutoAcknowledgeBatch
+		{
+			get { return this.session.IsDupsOkAcknowledge && !this.info.Destination.IsQueue; }
+		}
+
+		#region Nested ISyncronization Types
+
+		class MessageConsumerSynchronization : ISynchronization
+		{
+			private readonly MessageConsumer consumer;
+
+			public MessageConsumerSynchronization(MessageConsumer consumer)
+			{
+				this.consumer = consumer;
+			}
+
+			public void BeforeEnd()
+			{
+				this.consumer.Acknowledge();
+				this.consumer.synchronizationRegistered = false;
+			}
+
+			public void AfterCommit()
+			{
+				this.consumer.Commit();
+				this.consumer.synchronizationRegistered = false;
+			}
+
+			public void AfterRollback()
+			{
+				this.consumer.Rollback();
+				this.consumer.synchronizationRegistered = false;
+			}
+		}
+
+		class ConsumerCloseSynchronization : ISynchronization
+		{
+			private readonly MessageConsumer consumer;
+
+			public ConsumerCloseSynchronization(MessageConsumer consumer)
+			{
+				this.consumer = consumer;
+			}
+
+			public void BeforeEnd()
+			{
+			}
+
+			public void AfterCommit()
+			{
+				this.consumer.DoClose();
+			}
+
+			public void AfterRollback()
+			{
+				this.consumer.DoClose();
+			}
+		}
+
+		#endregion
+	}
 }



Mime
View raw message